"""Ground truth comparison metrics for evaluation.
This module provides metrics for comparing generated text against reference texts:
- BLEU: N-gram overlap with brevity penalty
- ROUGE: Recall-oriented n-gram and subsequence matching
- METEOR: Precision, recall, and word order
- Exact Match: Binary exact string matching
- F1 Score: Token-level precision and recall
- Semantic Similarity: Embedding-based and lexical similarity
"""
import math
import re
from collections import Counter
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
if TYPE_CHECKING:
from kerb.core.enums import SimilarityMethod
# ============================================================================
# Public Metrics
# ============================================================================
[docs]
def calculate_bleu(
candidate: str,
reference: Union[str, List[str]],
n: int = 4,
weights: Optional[List[float]] = None,
) -> float:
"""Calculate BLEU score between candidate and reference text(s).
BLEU (Bilingual Evaluation Understudy) measures n-gram overlap with brevity penalty.
Args:
candidate: The generated text to evaluate
reference: Reference text(s) (ground truth)
n: Maximum n-gram length (default: 4 for BLEU-4)
weights: Weights for each n-gram (default: equal weights)
Returns:
float: BLEU score between 0 and 1
Example:
>>> calculate_bleu("the cat sat", "the cat sat on mat")
0.7598
"""
if isinstance(reference, str):
references = [reference]
else:
references = reference
if not candidate or not references:
return 0.0
# Tokenize
candidate_tokens = candidate.lower().split()
reference_tokens_list = [ref.lower().split() for ref in references]
if not candidate_tokens:
return 0.0
# Default weights
if weights is None:
weights = [1.0 / n] * n
# Calculate n-gram precisions
precisions = []
for i in range(1, n + 1):
candidate_ngrams = _get_ngrams(candidate_tokens, i)
# Get maximum counts from all references
max_ref_counts: Dict[Tuple, int] = {}
for ref_tokens in reference_tokens_list:
ref_ngrams = _get_ngrams(ref_tokens, i)
for ngram, count in ref_ngrams.items():
max_ref_counts[ngram] = max(max_ref_counts.get(ngram, 0), count)
# Calculate clipped counts
clipped_count = 0
total_count = 0
for ngram, count in candidate_ngrams.items():
clipped_count += min(count, max_ref_counts.get(ngram, 0))
total_count += count
precision = clipped_count / total_count if total_count > 0 else 0.0
precisions.append(precision)
# Geometric mean of precisions
if any(p == 0 for p in precisions):
geo_mean = 0.0
else:
log_sum = sum(w * math.log(p) for w, p in zip(weights, precisions))
geo_mean = math.exp(log_sum)
# Brevity penalty
candidate_len = len(candidate_tokens)
ref_len = min(len(ref) for ref in reference_tokens_list)
if candidate_len >= ref_len:
bp = 1.0
else:
bp = math.exp(1 - ref_len / candidate_len) if candidate_len > 0 else 0.0
return bp * geo_mean
[docs]
def calculate_rouge(
candidate: str,
reference: Union[str, List[str]],
rouge_type: str = "rouge-l",
beta: float = 1.2,
) -> Dict[str, float]:
"""Calculate ROUGE scores between candidate and reference text(s).
ROUGE (Recall-Oriented Understudy for Gisting Evaluation) measures recall of n-grams.
Args:
candidate: The generated text to evaluate
reference: Reference text(s) (ground truth)
rouge_type: Type of ROUGE - "rouge-1", "rouge-2", "rouge-l"
beta: Beta parameter for F-measure (default: 1.2 favors recall)
Returns:
dict: Dictionary with 'precision', 'recall', 'fmeasure' scores
Example:
>>> calculate_rouge("the cat sat", "the cat sat on mat", "rouge-1")
{'precision': 1.0, 'recall': 0.6, 'fmeasure': 0.75}
"""
if isinstance(reference, str):
references = [reference]
else:
references = reference
if not candidate or not references:
return {"precision": 0.0, "recall": 0.0, "fmeasure": 0.0}
candidate_tokens = candidate.lower().split()
if rouge_type == "rouge-1":
return _rouge_n(candidate_tokens, references, 1, beta)
elif rouge_type == "rouge-2":
return _rouge_n(candidate_tokens, references, 2, beta)
elif rouge_type == "rouge-l":
return _rouge_l(candidate_tokens, references, beta)
else:
raise ValueError(f"Unknown ROUGE type: {rouge_type}")
[docs]
def calculate_meteor(
candidate: str,
reference: Union[str, List[str]],
alpha: float = 0.9,
beta: float = 3.0,
gamma: float = 0.5,
) -> float:
"""Calculate METEOR score (simplified version without stemming/synonyms).
METEOR considers precision, recall, and word order with harmonic mean.
Args:
candidate: The generated text to evaluate
reference: Reference text(s) (ground truth)
alpha: Weight for recall vs precision (default: 0.9)
beta: Shape parameter for f-mean (default: 3.0)
gamma: Penalty weight for fragmentation (default: 0.5)
Returns:
float: METEOR score between 0 and 1
Example:
>>> calculate_meteor("the cat sat", "the cat sat on mat")
0.833
"""
if isinstance(reference, str):
references = [reference]
else:
references = reference
if not candidate or not references:
return 0.0
candidate_tokens = candidate.lower().split()
# Calculate against best reference
best_score = 0.0
for ref in references:
ref_tokens = ref.lower().split()
# Find matches
matches = _find_matches(candidate_tokens, ref_tokens)
num_matches = len(matches)
if num_matches == 0:
continue
# Precision and recall
precision = num_matches / len(candidate_tokens) if candidate_tokens else 0.0
recall = num_matches / len(ref_tokens) if ref_tokens else 0.0
# F-mean
if precision + recall > 0:
fmean = (precision * recall) / (alpha * precision + (1 - alpha) * recall)
else:
fmean = 0.0
# Fragmentation penalty
chunks = _count_chunks(matches)
fragmentation = chunks / num_matches if num_matches > 0 else 1.0
penalty = gamma * (fragmentation**beta)
score = fmean * (1 - penalty)
best_score = max(best_score, score)
return best_score
[docs]
def calculate_exact_match(candidate: str, reference: Union[str, List[str]]) -> float:
"""Calculate exact match score (1.0 if exact match, 0.0 otherwise).
Args:
candidate: The generated text to evaluate
reference: Reference text(s) (ground truth)
Returns:
float: 1.0 if exact match, 0.0 otherwise
Example:
>>> calculate_exact_match("Paris", "Paris")
1.0
"""
if isinstance(reference, str):
references = [reference]
else:
references = reference
candidate_normalized = candidate.strip().lower()
for ref in references:
if candidate_normalized == ref.strip().lower():
return 1.0
return 0.0
[docs]
def calculate_f1_score(candidate: str, reference: Union[str, List[str]]) -> float:
"""Calculate token-level F1 score.
Args:
candidate: The generated text to evaluate
reference: Reference text(s) (ground truth)
Returns:
float: F1 score between 0 and 1
Example:
>>> calculate_f1_score("the cat sat", "the cat sat on mat")
0.857
"""
if isinstance(reference, str):
references = [reference]
else:
references = reference
if not candidate or not references:
return 0.0
candidate_tokens = set(candidate.lower().split())
best_f1 = 0.0
for ref in references:
ref_tokens = set(ref.lower().split())
if not candidate_tokens or not ref_tokens:
continue
common = candidate_tokens & ref_tokens
if not common:
continue
precision = len(common) / len(candidate_tokens)
recall = len(common) / len(ref_tokens)
f1 = 2 * precision * recall / (precision + recall)
best_f1 = max(best_f1, f1)
return best_f1
[docs]
def calculate_semantic_similarity(
text1: str, text2: str, method: Union["SimilarityMethod", str] = "embedding"
) -> float:
"""Calculate semantic similarity between two texts.
Args:
text1: First text
text2: Second text
method: Similarity method (SimilarityMethod enum or string: "embedding", "cosine", "jaccard", "bleu", "rouge", "bertscore")
Returns:
float: Similarity score between 0 and 1
Examples:
>>> from kerb.core.enums import SimilarityMethod
>>> score = calculate_semantic_similarity(text1, text2, method=SimilarityMethod.EMBEDDING)
>>> calculate_semantic_similarity("cat", "kitten", method=SimilarityMethod.JACCARD)
0.0
>>> calculate_semantic_similarity("the cat sat", "the cat sits", method=SimilarityMethod.JACCARD)
0.5
"""
from kerb.core.enums import SimilarityMethod, validate_enum_or_string
if not text1 or not text2:
return 0.0
# Validate and normalize method
method_val = validate_enum_or_string(method, SimilarityMethod, "method")
if isinstance(method_val, SimilarityMethod):
method_str = method_val.value
else:
method_str = method_val
if method_str in ("embedding", "cosine"):
# Try to use embedding module if available
try:
from ..embedding import cosine_similarity, embed
emb1 = embed(text1)
emb2 = embed(text2)
# Convert cosine similarity from [-1, 1] to [0, 1]
return (cosine_similarity(emb1, emb2) + 1) / 2
except ImportError:
# Fall back to Jaccard if embeddings not available
method_str = "jaccard"
if method_str == "jaccard":
tokens1 = set(text1.lower().split())
tokens2 = set(text2.lower().split())
if not tokens1 or not tokens2:
return 0.0
intersection = tokens1 & tokens2
union = tokens1 | tokens2
return len(intersection) / len(union) if union else 0.0
elif method == "tfidf":
# Simple TF-IDF cosine similarity
tokens1 = text1.lower().split()
tokens2 = text2.lower().split()
# Calculate TF
tf1 = Counter(tokens1)
tf2 = Counter(tokens2)
# Get all unique terms
all_terms = set(tf1.keys()) | set(tf2.keys())
# Simple IDF (just use log of inverse frequency)
total_docs = 2
doc_freq = {
term: (1 if term in tf1 else 0) + (1 if term in tf2 else 0)
for term in all_terms
}
idf = {
term: math.log(total_docs / freq) if freq > 0 else 0
for term, freq in doc_freq.items()
}
# Calculate TF-IDF vectors
vec1 = [tf1[term] * idf[term] for term in all_terms]
vec2 = [tf2[term] * idf[term] for term in all_terms]
# Cosine similarity
dot_product = sum(v1 * v2 for v1, v2 in zip(vec1, vec2))
magnitude1 = math.sqrt(sum(v * v for v in vec1))
magnitude2 = math.sqrt(sum(v * v for v in vec2))
if magnitude1 == 0 or magnitude2 == 0:
return 0.0
return dot_product / (magnitude1 * magnitude2)
else:
raise ValueError(f"Unknown similarity method: {method}")
# ============================================================================
# Helper Functions
# ============================================================================
def _get_ngrams(tokens: List[str], n: int) -> Dict[Tuple, int]:
"""Get n-grams from tokens."""
ngrams: Dict[Tuple, int] = Counter()
for i in range(len(tokens) - n + 1):
ngram = tuple(tokens[i : i + n])
ngrams[ngram] += 1
return dict(ngrams)
def _rouge_n(
candidate_tokens: List[str], references: List[str], n: int, beta: float
) -> Dict[str, float]:
"""Calculate ROUGE-N scores."""
candidate_ngrams = _get_ngrams(candidate_tokens, n)
# Get maximum counts from all references
max_ref_counts: Dict[Tuple, int] = {}
for ref in references:
ref_tokens = ref.lower().split()
ref_ngrams = _get_ngrams(ref_tokens, n)
for ngram, count in ref_ngrams.items():
max_ref_counts[ngram] = max(max_ref_counts.get(ngram, 0), count)
# Calculate matches
matches = sum(
min(count, max_ref_counts.get(ngram, 0))
for ngram, count in candidate_ngrams.items()
)
candidate_count = sum(candidate_ngrams.values())
reference_count = sum(max_ref_counts.values())
if candidate_count == 0 or reference_count == 0:
return {"precision": 0.0, "recall": 0.0, "fmeasure": 0.0}
precision = matches / candidate_count
recall = matches / reference_count
if precision + recall > 0:
fmeasure = (1 + beta**2) * precision * recall / (beta**2 * precision + recall)
else:
fmeasure = 0.0
return {"precision": precision, "recall": recall, "fmeasure": fmeasure}
def _rouge_l(
candidate_tokens: List[str], references: List[str], beta: float
) -> Dict[str, float]:
"""Calculate ROUGE-L (longest common subsequence) scores."""
best_lcs = 0
best_ref_len = 0
for ref in references:
ref_tokens = ref.lower().split()
lcs_length = _longest_common_subsequence(candidate_tokens, ref_tokens)
if lcs_length > best_lcs:
best_lcs = lcs_length
best_ref_len = len(ref_tokens)
if len(candidate_tokens) == 0 or best_ref_len == 0:
return {"precision": 0.0, "recall": 0.0, "fmeasure": 0.0}
precision = best_lcs / len(candidate_tokens)
recall = best_lcs / best_ref_len
if precision + recall > 0:
fmeasure = (1 + beta**2) * precision * recall / (beta**2 * precision + recall)
else:
fmeasure = 0.0
return {"precision": precision, "recall": recall, "fmeasure": fmeasure}
def _longest_common_subsequence(seq1: List[str], seq2: List[str]) -> int:
"""Calculate length of longest common subsequence."""
m, n = len(seq1), len(seq2)
dp = [[0] * (n + 1) for _ in range(m + 1)]
for i in range(1, m + 1):
for j in range(1, n + 1):
if seq1[i - 1] == seq2[j - 1]:
dp[i][j] = dp[i - 1][j - 1] + 1
else:
dp[i][j] = max(dp[i - 1][j], dp[i][j - 1])
return dp[m][n]
def _find_matches(tokens1: List[str], tokens2: List[str]) -> List[int]:
"""Find matching token indices."""
matches = []
used = set()
for i, token in enumerate(tokens1):
for j, ref_token in enumerate(tokens2):
if j not in used and token == ref_token:
matches.append(i)
used.add(j)
break
return matches
def _count_chunks(matches: List[int]) -> int:
"""Count number of contiguous chunks in matches."""
if not matches:
return 0
chunks = 1
for i in range(1, len(matches)):
if matches[i] != matches[i - 1] + 1:
chunks += 1
return chunks