Prompt Compression Techniques: Fitting More Context in Less Tokens

Introduction: Context windows are limited and tokens are expensive. Long prompts with extensive context, examples, or retrieved documents quickly hit limits and drive up costs. Prompt compression techniques reduce token count while preserving the information LLMs need to generate quality responses. This guide covers practical compression strategies: token pruning to remove low-information tokens, extractive summarization to keep only essential sentences, abstractive compression using smaller models, and soft prompt techniques that encode information in learned embeddings. Whether you’re building RAG systems with large document contexts or optimizing API costs, prompt compression can reduce tokens by 50-80% with minimal quality loss.

Prompt Compression
Prompt Compression: Token Pruning, Extractive Summary, Soft Prompts

Token Pruning

from dataclasses import dataclass
from typing import Any, Optional
from abc import ABC, abstractmethod
import re

@dataclass
class CompressionResult:
    """Result of prompt compression."""
    
    original: str
    compressed: str
    original_tokens: int
    compressed_tokens: int
    compression_ratio: float
    
    @property
    def tokens_saved(self) -> int:
        return self.original_tokens - self.compressed_tokens

class PromptCompressor(ABC):
    """Abstract prompt compressor."""
    
    @abstractmethod
    async def compress(
        self,
        text: str,
        target_tokens: int = None,
        target_ratio: float = None
    ) -> CompressionResult:
        """Compress text."""
        pass
    
    def _count_tokens(self, text: str, tokenizer: Any = None) -> int:
        """Count tokens in text."""
        
        if tokenizer:
            return len(tokenizer.encode(text))
        # Rough estimate
        return len(text) // 4

class StopwordPruner(PromptCompressor):
    """Remove stopwords and low-information tokens."""
    
    def __init__(self, tokenizer: Any = None):
        self.tokenizer = tokenizer
        
        self.stopwords = {
            "the", "a", "an", "is", "are", "was", "were", "be", "been",
            "being", "have", "has", "had", "do", "does", "did", "will",
            "would", "could", "should", "may", "might", "must", "shall",
            "can", "need", "dare", "ought", "used", "to", "of", "in",
            "for", "on", "with", "at", "by", "from", "as", "into",
            "through", "during", "before", "after", "above", "below",
            "between", "under", "again", "further", "then", "once",
            "here", "there", "when", "where", "why", "how", "all",
            "each", "few", "more", "most", "other", "some", "such",
            "no", "nor", "not", "only", "own", "same", "so", "than",
            "too", "very", "just", "also", "now", "and", "but", "or",
            "because", "until", "while", "although", "though", "if"
        }
        
        self.filler_patterns = [
            r"\b(basically|actually|literally|really|very|quite|rather)\b",
            r"\b(in order to|due to the fact that|for the purpose of)\b",
            r"\b(it is|there is|there are)\b",
            r"\s+",  # Multiple spaces
        ]
    
    async def compress(
        self,
        text: str,
        target_tokens: int = None,
        target_ratio: float = None
    ) -> CompressionResult:
        """Remove stopwords and fillers."""
        
        original_tokens = self._count_tokens(text, self.tokenizer)
        
        # Remove filler patterns
        compressed = text
        for pattern in self.filler_patterns:
            compressed = re.sub(pattern, " ", compressed, flags=re.IGNORECASE)
        
        # Remove stopwords (preserve sentence structure)
        words = compressed.split()
        filtered = []
        
        for i, word in enumerate(words):
            word_lower = word.lower().strip(".,!?;:")
            
            # Keep first and last words of sentences
            is_sentence_boundary = (
                i == 0 or
                (i > 0 and words[i-1].endswith((".", "!", "?")))
            )
            
            if is_sentence_boundary or word_lower not in self.stopwords:
                filtered.append(word)
        
        compressed = " ".join(filtered)
        compressed = re.sub(r"\s+", " ", compressed).strip()
        
        compressed_tokens = self._count_tokens(compressed, self.tokenizer)
        
        return CompressionResult(
            original=text,
            compressed=compressed,
            original_tokens=original_tokens,
            compressed_tokens=compressed_tokens,
            compression_ratio=compressed_tokens / original_tokens if original_tokens > 0 else 1.0
        )

class ImportanceBasedPruner(PromptCompressor):
    """Prune tokens based on importance scores."""
    
    def __init__(
        self,
        embedding_model: Any = None,
        tokenizer: Any = None
    ):
        self.embedding_model = embedding_model
        self.tokenizer = tokenizer
    
    async def compress(
        self,
        text: str,
        target_tokens: int = None,
        target_ratio: float = 0.5
    ) -> CompressionResult:
        """Prune low-importance tokens."""
        
        original_tokens = self._count_tokens(text, self.tokenizer)
        
        # Split into sentences
        sentences = self._split_sentences(text)
        
        # Score sentences by importance
        scored = await self._score_sentences(sentences)
        
        # Calculate target
        if target_tokens:
            target = target_tokens
        else:
            target = int(original_tokens * target_ratio)
        
        # Select sentences until target reached
        scored.sort(key=lambda x: x[1], reverse=True)
        
        selected = []
        current_tokens = 0
        
        for sentence, score in scored:
            sentence_tokens = self._count_tokens(sentence, self.tokenizer)
            if current_tokens + sentence_tokens <= target:
                selected.append((sentence, score))
                current_tokens += sentence_tokens
        
        # Restore original order
        selected.sort(key=lambda x: text.index(x[0]))
        compressed = " ".join(s for s, _ in selected)
        
        compressed_tokens = self._count_tokens(compressed, self.tokenizer)
        
        return CompressionResult(
            original=text,
            compressed=compressed,
            original_tokens=original_tokens,
            compressed_tokens=compressed_tokens,
            compression_ratio=compressed_tokens / original_tokens if original_tokens > 0 else 1.0
        )
    
    def _split_sentences(self, text: str) -> list[str]:
        """Split text into sentences."""
        
        sentences = re.split(r'(?<=[.!?])\s+', text)
        return [s.strip() for s in sentences if s.strip()]
    
    async def _score_sentences(
        self,
        sentences: list[str]
    ) -> list[tuple[str, float]]:
        """Score sentences by importance."""
        
        if not self.embedding_model:
            # Simple heuristic: longer sentences with keywords score higher
            scored = []
            keywords = {"important", "key", "main", "critical", "essential", "must", "should"}
            
            for sentence in sentences:
                score = len(sentence.split()) / 20  # Length score
                
                # Keyword bonus
                for keyword in keywords:
                    if keyword in sentence.lower():
                        score += 0.2
                
                scored.append((sentence, score))
            
            return scored
        
        # Use embeddings for semantic importance
        embeddings = await self.embedding_model.embed(sentences)
        
        # Calculate centroid
        import numpy as np
        centroid = np.mean(embeddings, axis=0)
        
        # Score by similarity to centroid (central sentences are important)
        scored = []
        for sentence, embedding in zip(sentences, embeddings):
            similarity = np.dot(embedding, centroid) / (
                np.linalg.norm(embedding) * np.linalg.norm(centroid)
            )
            scored.append((sentence, float(similarity)))
        
        return scored

Extractive Compression

from dataclasses import dataclass
from typing import Any, Optional
import numpy as np

class TextRankCompressor(PromptCompressor):
    """TextRank-based extractive compression."""
    
    def __init__(
        self,
        embedding_model: Any = None,
        damping: float = 0.85,
        iterations: int = 100
    ):
        self.embedding_model = embedding_model
        self.damping = damping
        self.iterations = iterations
    
    async def compress(
        self,
        text: str,
        target_tokens: int = None,
        target_ratio: float = 0.5
    ) -> CompressionResult:
        """Compress using TextRank."""
        
        original_tokens = self._count_tokens(text)
        
        # Split into sentences
        sentences = self._split_sentences(text)
        
        if len(sentences) <= 2:
            return CompressionResult(
                original=text,
                compressed=text,
                original_tokens=original_tokens,
                compressed_tokens=original_tokens,
                compression_ratio=1.0
            )
        
        # Build similarity matrix
        similarity_matrix = await self._build_similarity_matrix(sentences)
        
        # Run TextRank
        scores = self._textrank(similarity_matrix)
        
        # Calculate target
        if target_tokens:
            target = target_tokens
        else:
            target = int(original_tokens * target_ratio)
        
        # Select top sentences
        ranked = sorted(
            enumerate(sentences),
            key=lambda x: scores[x[0]],
            reverse=True
        )
        
        selected_indices = []
        current_tokens = 0
        
        for idx, sentence in ranked:
            sentence_tokens = self._count_tokens(sentence)
            if current_tokens + sentence_tokens <= target:
                selected_indices.append(idx)
                current_tokens += sentence_tokens
        
        # Restore order
        selected_indices.sort()
        compressed = " ".join(sentences[i] for i in selected_indices)
        
        compressed_tokens = self._count_tokens(compressed)
        
        return CompressionResult(
            original=text,
            compressed=compressed,
            original_tokens=original_tokens,
            compressed_tokens=compressed_tokens,
            compression_ratio=compressed_tokens / original_tokens if original_tokens > 0 else 1.0
        )
    
    def _split_sentences(self, text: str) -> list[str]:
        """Split into sentences."""
        
        sentences = re.split(r'(?<=[.!?])\s+', text)
        return [s.strip() for s in sentences if s.strip()]
    
    async def _build_similarity_matrix(
        self,
        sentences: list[str]
    ) -> np.ndarray:
        """Build sentence similarity matrix."""
        
        n = len(sentences)
        
        if self.embedding_model:
            embeddings = await self.embedding_model.embed(sentences)
            embeddings = np.array(embeddings)
            
            # Normalize
            norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
            embeddings = embeddings / norms
            
            # Cosine similarity
            similarity = np.dot(embeddings, embeddings.T)
        else:
            # Simple word overlap
            similarity = np.zeros((n, n))
            
            for i in range(n):
                words_i = set(sentences[i].lower().split())
                for j in range(n):
                    if i != j:
                        words_j = set(sentences[j].lower().split())
                        overlap = len(words_i & words_j)
                        union = len(words_i | words_j)
                        similarity[i, j] = overlap / union if union > 0 else 0
        
        # Zero diagonal
        np.fill_diagonal(similarity, 0)
        
        return similarity
    
    def _textrank(self, similarity_matrix: np.ndarray) -> np.ndarray:
        """Run TextRank algorithm."""
        
        n = similarity_matrix.shape[0]
        
        # Normalize columns
        col_sums = similarity_matrix.sum(axis=0, keepdims=True)
        col_sums[col_sums == 0] = 1
        transition = similarity_matrix / col_sums
        
        # Initialize scores
        scores = np.ones(n) / n
        
        # Iterate
        for _ in range(self.iterations):
            new_scores = (1 - self.damping) / n + self.damping * transition.dot(scores)
            
            if np.allclose(scores, new_scores):
                break
            
            scores = new_scores
        
        return scores

class QueryFocusedCompressor(PromptCompressor):
    """Compress context based on query relevance."""
    
    def __init__(
        self,
        embedding_model: Any,
        tokenizer: Any = None
    ):
        self.embedding_model = embedding_model
        self.tokenizer = tokenizer
    
    async def compress(
        self,
        text: str,
        target_tokens: int = None,
        target_ratio: float = 0.5,
        query: str = None
    ) -> CompressionResult:
        """Compress keeping query-relevant content."""
        
        if not query:
            # Fall back to general compression
            compressor = TextRankCompressor(self.embedding_model)
            return await compressor.compress(text, target_tokens, target_ratio)
        
        original_tokens = self._count_tokens(text, self.tokenizer)
        
        # Split into chunks
        chunks = self._split_chunks(text)
        
        # Score by query relevance
        scored = await self._score_by_relevance(chunks, query)
        
        # Calculate target
        if target_tokens:
            target = target_tokens
        else:
            target = int(original_tokens * target_ratio)
        
        # Select most relevant chunks
        scored.sort(key=lambda x: x[1], reverse=True)
        
        selected = []
        current_tokens = 0
        
        for chunk, score in scored:
            chunk_tokens = self._count_tokens(chunk, self.tokenizer)
            if current_tokens + chunk_tokens <= target:
                selected.append((chunk, text.index(chunk)))
                current_tokens += chunk_tokens
        
        # Restore order
        selected.sort(key=lambda x: x[1])
        compressed = " ".join(chunk for chunk, _ in selected)
        
        compressed_tokens = self._count_tokens(compressed, self.tokenizer)
        
        return CompressionResult(
            original=text,
            compressed=compressed,
            original_tokens=original_tokens,
            compressed_tokens=compressed_tokens,
            compression_ratio=compressed_tokens / original_tokens if original_tokens > 0 else 1.0
        )
    
    def _split_chunks(self, text: str, chunk_size: int = 100) -> list[str]:
        """Split into word chunks."""
        
        words = text.split()
        chunks = []
        
        for i in range(0, len(words), chunk_size):
            chunk = " ".join(words[i:i + chunk_size])
            chunks.append(chunk)
        
        return chunks
    
    async def _score_by_relevance(
        self,
        chunks: list[str],
        query: str
    ) -> list[tuple[str, float]]:
        """Score chunks by query relevance."""
        
        # Get embeddings
        all_texts = [query] + chunks
        embeddings = await self.embedding_model.embed(all_texts)
        
        query_embedding = np.array(embeddings[0])
        chunk_embeddings = np.array(embeddings[1:])
        
        # Normalize
        query_embedding = query_embedding / np.linalg.norm(query_embedding)
        norms = np.linalg.norm(chunk_embeddings, axis=1, keepdims=True)
        chunk_embeddings = chunk_embeddings / norms
        
        # Calculate similarities
        similarities = np.dot(chunk_embeddings, query_embedding)
        
        return list(zip(chunks, similarities.tolist()))

LLM-Based Compression

from dataclasses import dataclass
from typing import Any, Optional

class LLMCompressor(PromptCompressor):
    """Use LLM to compress text."""
    
    def __init__(
        self,
        client: Any,
        model: str = "gpt-4o-mini",
        tokenizer: Any = None
    ):
        self.client = client
        self.model = model
        self.tokenizer = tokenizer
    
    async def compress(
        self,
        text: str,
        target_tokens: int = None,
        target_ratio: float = 0.5,
        preserve_facts: bool = True
    ) -> CompressionResult:
        """Compress using LLM."""
        
        original_tokens = self._count_tokens(text, self.tokenizer)
        
        if target_tokens:
            target_words = target_tokens * 3 // 4  # Rough conversion
        else:
            target_words = int(len(text.split()) * target_ratio)
        
        prompt = f"""Compress the following text to approximately {target_words} words while preserving the key information.

{"Preserve all factual claims and specific details." if preserve_facts else "Focus on main ideas only."}

Text to compress:
{text}

Compressed version:"""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        
        compressed = response.choices[0].message.content.strip()
        compressed_tokens = self._count_tokens(compressed, self.tokenizer)
        
        return CompressionResult(
            original=text,
            compressed=compressed,
            original_tokens=original_tokens,
            compressed_tokens=compressed_tokens,
            compression_ratio=compressed_tokens / original_tokens if original_tokens > 0 else 1.0
        )

class ChainOfDensityCompressor(PromptCompressor):
    """Chain of Density summarization."""
    
    def __init__(
        self,
        client: Any,
        model: str = "gpt-4o-mini",
        iterations: int = 3
    ):
        self.client = client
        self.model = model
        self.iterations = iterations
    
    async def compress(
        self,
        text: str,
        target_tokens: int = None,
        target_ratio: float = None
    ) -> CompressionResult:
        """Iteratively densify summary."""
        
        original_tokens = self._count_tokens(text)
        
        # Initial summary
        summary = await self._initial_summary(text)
        
        # Iteratively densify
        for i in range(self.iterations):
            summary = await self._densify(text, summary, i + 1)
        
        compressed_tokens = self._count_tokens(summary)
        
        return CompressionResult(
            original=text,
            compressed=summary,
            original_tokens=original_tokens,
            compressed_tokens=compressed_tokens,
            compression_ratio=compressed_tokens / original_tokens if original_tokens > 0 else 1.0
        )
    
    async def _initial_summary(self, text: str) -> str:
        """Generate initial summary."""
        
        prompt = f"""Write a concise summary of the following text in 2-3 sentences:

{text}

Summary:"""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        
        return response.choices[0].message.content.strip()
    
    async def _densify(
        self,
        original: str,
        current_summary: str,
        iteration: int
    ) -> str:
        """Make summary denser."""
        
        prompt = f"""Here is an article and its current summary. Make the summary denser by:
1. Identifying 1-2 important entities or details missing from the summary
2. Adding them to the summary without increasing length significantly
3. Removing less important filler words to make room

Article:
{original}

Current Summary (iteration {iteration}):
{current_summary}

Denser Summary:"""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        
        return response.choices[0].message.content.strip()

class LongLLMCompressor(PromptCompressor):
    """Compress using specialized long-context compression model."""
    
    def __init__(
        self,
        model_name: str = "microsoft/llmlingua-2-bert-base-multilingual-cased-meetingbank",
        device: str = "cpu"
    ):
        self.model_name = model_name
        self.device = device
        self._compressor = None
    
    def _load_compressor(self):
        """Lazy load compressor."""
        
        if self._compressor is None:
            # Would load actual LLMLingua model
            # from llmlingua import PromptCompressor
            # self._compressor = PromptCompressor(
            #     model_name=self.model_name,
            #     device_map=self.device
            # )
            pass
    
    async def compress(
        self,
        text: str,
        target_tokens: int = None,
        target_ratio: float = 0.5,
        instruction: str = None
    ) -> CompressionResult:
        """Compress using LLMLingua."""
        
        self._load_compressor()
        
        original_tokens = self._count_tokens(text)
        
        if self._compressor:
            # Would use actual compression
            # result = self._compressor.compress_prompt(
            #     text,
            #     instruction=instruction,
            #     target_token=target_tokens,
            #     rate=target_ratio
            # )
            # compressed = result["compressed_prompt"]
            compressed = text[:int(len(text) * target_ratio)]
        else:
            # Fallback to simple truncation
            compressed = text[:int(len(text) * target_ratio)]
        
        compressed_tokens = self._count_tokens(compressed)
        
        return CompressionResult(
            original=text,
            compressed=compressed,
            original_tokens=original_tokens,
            compressed_tokens=compressed_tokens,
            compression_ratio=compressed_tokens / original_tokens if original_tokens > 0 else 1.0
        )

Adaptive Compression

from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum

class CompressionStrategy(Enum):
    """Compression strategies."""
    
    STOPWORD = "stopword"
    EXTRACTIVE = "extractive"
    LLM = "llm"
    HYBRID = "hybrid"

@dataclass
class CompressionConfig:
    """Configuration for adaptive compression."""
    
    target_ratio: float = 0.5
    min_ratio: float = 0.3
    max_ratio: float = 0.8
    quality_threshold: float = 0.8
    strategy: CompressionStrategy = CompressionStrategy.HYBRID

class AdaptiveCompressor:
    """Adaptively choose compression strategy."""
    
    def __init__(
        self,
        embedding_model: Any = None,
        llm_client: Any = None,
        config: CompressionConfig = None
    ):
        self.embedding_model = embedding_model
        self.llm_client = llm_client
        self.config = config or CompressionConfig()
        
        # Initialize compressors
        self.stopword = StopwordPruner()
        self.extractive = TextRankCompressor(embedding_model)
        self.llm = LLMCompressor(llm_client) if llm_client else None
    
    async def compress(
        self,
        text: str,
        target_tokens: int = None,
        query: str = None
    ) -> CompressionResult:
        """Adaptively compress text."""
        
        original_tokens = self._count_tokens(text)
        
        # Determine target
        if target_tokens:
            target_ratio = target_tokens / original_tokens
        else:
            target_ratio = self.config.target_ratio
        
        # Choose strategy based on text characteristics
        strategy = self._choose_strategy(text, target_ratio)
        
        # Apply compression
        if strategy == CompressionStrategy.STOPWORD:
            result = await self.stopword.compress(text, target_ratio=target_ratio)
        
        elif strategy == CompressionStrategy.EXTRACTIVE:
            if query and self.embedding_model:
                compressor = QueryFocusedCompressor(self.embedding_model)
                result = await compressor.compress(
                    text, target_ratio=target_ratio, query=query
                )
            else:
                result = await self.extractive.compress(text, target_ratio=target_ratio)
        
        elif strategy == CompressionStrategy.LLM and self.llm:
            result = await self.llm.compress(text, target_ratio=target_ratio)
        
        else:  # HYBRID
            result = await self._hybrid_compress(text, target_ratio, query)
        
        return result
    
    def _choose_strategy(
        self,
        text: str,
        target_ratio: float
    ) -> CompressionStrategy:
        """Choose compression strategy."""
        
        word_count = len(text.split())
        
        # Short text: stopword pruning
        if word_count < 100:
            return CompressionStrategy.STOPWORD
        
        # Aggressive compression: use LLM
        if target_ratio < 0.3 and self.llm:
            return CompressionStrategy.LLM
        
        # Moderate compression: extractive
        if target_ratio < 0.6:
            return CompressionStrategy.EXTRACTIVE
        
        # Light compression: stopword
        return CompressionStrategy.STOPWORD
    
    async def _hybrid_compress(
        self,
        text: str,
        target_ratio: float,
        query: str = None
    ) -> CompressionResult:
        """Apply multiple compression stages."""
        
        original_tokens = self._count_tokens(text)
        
        # Stage 1: Stopword pruning (light)
        stage1 = await self.stopword.compress(text, target_ratio=0.8)
        
        # Stage 2: Extractive (if needed)
        if stage1.compression_ratio > target_ratio:
            if query and self.embedding_model:
                compressor = QueryFocusedCompressor(self.embedding_model)
                stage2 = await compressor.compress(
                    stage1.compressed,
                    target_ratio=target_ratio / stage1.compression_ratio,
                    query=query
                )
            else:
                stage2 = await self.extractive.compress(
                    stage1.compressed,
                    target_ratio=target_ratio / stage1.compression_ratio
                )
            
            compressed = stage2.compressed
        else:
            compressed = stage1.compressed
        
        compressed_tokens = self._count_tokens(compressed)
        
        return CompressionResult(
            original=text,
            compressed=compressed,
            original_tokens=original_tokens,
            compressed_tokens=compressed_tokens,
            compression_ratio=compressed_tokens / original_tokens if original_tokens > 0 else 1.0
        )
    
    def _count_tokens(self, text: str) -> int:
        """Count tokens."""
        return len(text) // 4

class QualityAwareCompressor:
    """Compress while maintaining quality."""
    
    def __init__(
        self,
        compressor: PromptCompressor,
        evaluator: Any = None,
        min_quality: float = 0.8
    ):
        self.compressor = compressor
        self.evaluator = evaluator
        self.min_quality = min_quality
    
    async def compress(
        self,
        text: str,
        target_ratio: float = 0.5
    ) -> CompressionResult:
        """Compress with quality check."""
        
        # Try compression
        result = await self.compressor.compress(text, target_ratio=target_ratio)
        
        # Check quality
        if self.evaluator:
            quality = await self._evaluate_quality(text, result.compressed)
            
            # If quality too low, try less aggressive compression
            if quality < self.min_quality:
                # Binary search for acceptable ratio
                low, high = target_ratio, 1.0
                
                while high - low > 0.05:
                    mid = (low + high) / 2
                    result = await self.compressor.compress(text, target_ratio=mid)
                    quality = await self._evaluate_quality(text, result.compressed)
                    
                    if quality >= self.min_quality:
                        high = mid
                    else:
                        low = mid
        
        return result
    
    async def _evaluate_quality(
        self,
        original: str,
        compressed: str
    ) -> float:
        """Evaluate compression quality."""
        
        if not self.evaluator:
            return 1.0
        
        # Would use actual evaluator
        # Could be embedding similarity, LLM judge, etc.
        return 0.9

Production Compression Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

class CompressRequest(BaseModel):
    text: str
    target_tokens: Optional[int] = None
    target_ratio: float = 0.5
    strategy: str = "auto"
    query: Optional[str] = None

class CompressResponse(BaseModel):
    compressed: str
    original_tokens: int
    compressed_tokens: int
    compression_ratio: float
    tokens_saved: int
    strategy_used: str

# Initialize compressors
stopword_compressor = StopwordPruner()
# extractive_compressor = TextRankCompressor(embedding_model)
# adaptive_compressor = AdaptiveCompressor(embedding_model, llm_client)

@app.post("/v1/compress")
async def compress(request: CompressRequest) -> CompressResponse:
    """Compress text."""
    
    if request.strategy == "stopword":
        result = await stopword_compressor.compress(
            request.text,
            target_ratio=request.target_ratio
        )
        strategy_used = "stopword"
    
    elif request.strategy == "extractive":
        # result = await extractive_compressor.compress(...)
        result = await stopword_compressor.compress(
            request.text,
            target_ratio=request.target_ratio
        )
        strategy_used = "extractive"
    
    else:  # auto
        # result = await adaptive_compressor.compress(...)
        result = await stopword_compressor.compress(
            request.text,
            target_ratio=request.target_ratio
        )
        strategy_used = "auto"
    
    return CompressResponse(
        compressed=result.compressed,
        original_tokens=result.original_tokens,
        compressed_tokens=result.compressed_tokens,
        compression_ratio=result.compression_ratio,
        tokens_saved=result.tokens_saved,
        strategy_used=strategy_used
    )

@app.post("/v1/compress/batch")
async def compress_batch(texts: list[str], target_ratio: float = 0.5):
    """Compress multiple texts."""
    
    results = []
    for text in texts:
        result = await stopword_compressor.compress(text, target_ratio=target_ratio)
        results.append({
            "compressed": result.compressed,
            "compression_ratio": result.compression_ratio
        })
    
    return {"results": results}

@app.get("/v1/strategies")
async def list_strategies():
    """List available compression strategies."""
    
    return {
        "strategies": [
            {
                "name": "stopword",
                "description": "Remove stopwords and filler words",
                "best_for": "Light compression (>70% retention)"
            },
            {
                "name": "extractive",
                "description": "Select most important sentences",
                "best_for": "Moderate compression (40-70% retention)"
            },
            {
                "name": "llm",
                "description": "Use LLM to rewrite concisely",
                "best_for": "Aggressive compression (<40% retention)"
            },
            {
                "name": "auto",
                "description": "Automatically choose best strategy",
                "best_for": "General use"
            }
        ]
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Prompt compression is essential for managing context windows and costs. Start with simple techniques—stopword removal and filler pruning can reduce tokens by 20-30% with no quality loss. For more aggressive compression, use extractive methods like TextRank to select the most important sentences. When you have a query, use query-focused compression to keep only relevant content. For maximum compression, LLM-based abstractive summarization can achieve 50-80% reduction while preserving key information. The Chain of Density technique iteratively densifies summaries for optimal information density. Implement adaptive compression that chooses strategies based on text length and target ratio. Monitor compression quality—use embedding similarity or LLM judges to ensure compressed prompts still contain necessary information. The key insight is that most prompts contain significant redundancy—careful compression removes this redundancy while preserving the information LLMs need to generate quality responses, reducing costs and enabling longer effective context.


Discover more from C4: Container, Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.