Source code for kerb.retrieval.context

"""Context management utilities for retrieval.

This module provides functions for compressing and filtering search results.
"""

from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union

from kerb.core.types import Document

from .structures import FilterConfig, SearchResult

if TYPE_CHECKING:
    from kerb.core.enums import CompressionStrategy


[docs] def compress_context( query: str, results: List[SearchResult], max_tokens: int = 2000, strategy: Union["CompressionStrategy", str] = "top_k", ) -> List[SearchResult]: """Compress retrieved context to fit within token limits. Args: query: The search query results: Search results to compress max_tokens: Maximum number of tokens (approximate) strategy: Compression strategy (CompressionStrategy enum or string: "top_k", "summarize", "filter", "truncate") Returns: List[SearchResult]: Compressed results Examples: >>> from kerb.core.enums import CompressionStrategy >>> compressed = compress_context(query, results, max_tokens=1000, strategy=CompressionStrategy.TOP_K) """ from kerb.core.enums import CompressionStrategy, validate_enum_or_string if not results: return [] # Validate and normalize strategy strategy_val = validate_enum_or_string(strategy, CompressionStrategy, "strategy") if isinstance(strategy_val, CompressionStrategy): strategy_str = strategy_val.value else: strategy_str = strategy_val # Rough token estimation (4 chars ≈ 1 token) chars_per_token = 4 max_chars = max_tokens * chars_per_token if strategy_str == "top_k": # Simply take top results until token limit compressed = [] total_chars = 0 for result in results: doc_chars = len(result.document.content) if total_chars + doc_chars <= max_chars: compressed.append(result) total_chars += doc_chars else: break return compressed elif strategy_str == "summarize": # Smart compression: extract most relevant sentences from each document compressed = [] query_terms = set(query.lower().split()) total_chars = 0 for result in results: if total_chars >= max_chars: break # Split into sentences sentences = [ s.strip() + "." for s in result.document.content.split(".") if s.strip() ] # Score sentences by query term overlap sentence_scores = [] for sent in sentences: sent_terms = set(sent.lower().split()) overlap = len(query_terms & sent_terms) sentence_scores.append((sent, overlap)) # Sort by relevance sentence_scores.sort(key=lambda x: x[1], reverse=True) # Take top sentences that fit compressed_content = [] doc_chars = 0 for sent, _ in sentence_scores: sent_chars = len(sent) if total_chars + doc_chars + sent_chars <= max_chars: compressed_content.append(sent) doc_chars += sent_chars else: break if compressed_content: # Create compressed document compressed_doc = Document( id=result.document.id, content=" ".join(compressed_content), metadata=result.document.metadata, score=result.document.score, ) compressed.append( SearchResult( document=compressed_doc, score=result.score, rank=result.rank, method="compressed_smart", ) ) total_chars += doc_chars return compressed elif strategy_str == "filter": # Extract only query-relevant excerpts compressed = [] query_terms = set(query.lower().split()) total_chars = 0 for result in results: if total_chars >= max_chars: break content = result.document.content words = content.split() # Find windows containing query terms window_size = 50 # words per window best_window = [] best_overlap = 0 for i in range(0, len(words), window_size // 2): window = words[i : i + window_size] window_terms = set(w.lower() for w in window) overlap = len(query_terms & window_terms) if overlap > best_overlap: best_overlap = overlap best_window = window if best_window: excerpt = " ".join(best_window) excerpt_chars = len(excerpt) if total_chars + excerpt_chars <= max_chars: compressed_doc = Document( id=result.document.id, content=excerpt, metadata=result.document.metadata, score=result.document.score, ) compressed.append( SearchResult( document=compressed_doc, score=result.score, rank=result.rank, method="compressed_extract", ) ) total_chars += excerpt_chars return compressed elif strategy_str == "truncate": # Simple truncate each document to fit compressed = [] total_chars = 0 docs_to_include = min( len(results), max_chars // (max_chars // max(1, len(results))) ) chars_per_doc = max_chars // max(1, docs_to_include) for i, result in enumerate(results[:docs_to_include]): content = result.document.content if len(content) > chars_per_doc: content = content[: chars_per_doc - 3] + "..." compressed_doc = Document( id=result.document.id, content=content, metadata=result.document.metadata, score=result.document.score, ) compressed.append( SearchResult( document=compressed_doc, score=result.score, rank=result.rank, method="compressed_truncate", ) ) return compressed # Default: return top_k strategy return compress_context(query, results, max_tokens, "top_k")
[docs] def filter_results( results: List[SearchResult], min_score: Optional[float] = None, max_results: Optional[int] = None, metadata_filter: Optional[Dict[str, Any]] = None, dedup_threshold: float = 0.9, config: Optional[FilterConfig] = None, ) -> List[SearchResult]: """Filter search results based on various criteria. Args: results: Search results to filter min_score: Minimum score threshold (ignored if config is provided) max_results: Maximum number of results (ignored if config is provided) metadata_filter: Filter by metadata fields (ignored if config is provided) dedup_threshold: Similarity threshold for deduplication (ignored if config is provided) config: FilterConfig object with all parameters (recommended) Returns: List[SearchResult]: Filtered results Examples: >>> # Using config object (recommended) >>> from kerb.retrieval import FilterConfig >>> config = FilterConfig( ... min_score=0.5, ... max_results=10, ... metadata_filter={"category": "tech"}, ... dedup_threshold=0.9 ... ) >>> filtered = filter_results(results, config=config) >>> # Using individual parameters (backward compatible) >>> filtered = filter_results( ... results, ... min_score=0.5, ... max_results=10, ... metadata_filter={"category": "tech"} ... ) """ # Use config if provided, otherwise use individual parameters if config is not None: min_score = config.min_score max_results = config.max_results metadata_filter = config.metadata_filter dedup_threshold = config.dedup_threshold filtered = results # Filter by minimum score if min_score is not None: filtered = [r for r in filtered if r.score >= min_score] # Filter by metadata if metadata_filter: filtered = [ r for r in filtered if all(r.document.metadata.get(k) == v for k, v in metadata_filter.items()) ] # Deduplicate similar results if dedup_threshold < 1.0: deduped = [] for result in filtered: is_duplicate = False for existing in deduped: # Simple content similarity check words1 = set(result.document.content.lower().split()) words2 = set(existing.document.content.lower().split()) if words1 and words2: similarity = len(words1 & words2) / len(words1 | words2) if similarity >= dedup_threshold: is_duplicate = True break if not is_duplicate: deduped.append(result) filtered = deduped # Limit results if max_results is not None: filtered = filtered[:max_results] # Update ranks for i, result in enumerate(filtered, 1): result.rank = i return filtered