"""Text deduplication operations."""
import hashlib
import re
from typing import Callable, Dict, List, Optional
from .enums import DeduplicationMode
from .text import normalize_whitespace
[docs]
def deduplicate_exact(texts: List[str], keep_order: bool = True) -> List[str]:
"""Remove exact duplicates.
Args:
texts: List of texts
keep_order: Preserve original order
Returns:
List with duplicates removed
Examples:
>>> deduplicate_exact(["a", "b", "a", "c"])
['a', 'b', 'c']
"""
if keep_order:
seen = set()
result = []
for text in texts:
if text not in seen:
seen.add(text)
result.append(text)
return result
else:
return list(set(texts))
[docs]
def deduplicate_fuzzy(
texts: List[str], similarity_threshold: float = 0.9, keep_order: bool = True
) -> List[str]:
"""Remove fuzzy/near duplicates.
Args:
texts: List of texts
similarity_threshold: Similarity threshold (0-1)
keep_order: Preserve original order
Returns:
List with fuzzy duplicates removed
Examples:
>>> deduplicate_fuzzy(["hello world", "hello world", "goodbye"])
['hello world', 'goodbye']
"""
if not texts:
return []
# Normalize texts for comparison
normalized = [normalize_whitespace(t.lower()) for t in texts]
result = []
seen_normalized = []
for i, text in enumerate(texts):
norm = normalized[i]
# Check similarity with already seen texts
is_duplicate = False
for seen_norm in seen_normalized:
similarity = _simple_similarity(norm, seen_norm)
if similarity >= similarity_threshold:
is_duplicate = True
break
if not is_duplicate:
result.append(text)
seen_normalized.append(norm)
return result
[docs]
def deduplicate_semantic(
texts: List[str],
similarity_threshold: float = 0.85,
embed_fn: Optional[Callable] = None,
) -> List[str]:
"""Remove semantically similar texts.
Args:
texts: List of texts
similarity_threshold: Semantic similarity threshold (0-1)
embed_fn: Optional embedding function (uses simple fallback if None)
Returns:
List with semantic duplicates removed
Examples:
>>> deduplicate_semantic(["hello", "hi", "goodbye"])
['hello', 'goodbye']
"""
if not texts or len(texts) <= 1:
return texts
# If no embedding function provided, fall back to fuzzy
if embed_fn is None:
return deduplicate_fuzzy(texts, similarity_threshold)
# Use provided embedding function
embeddings = [embed_fn(text) for text in texts]
result = []
kept_embeddings = []
for i, text in enumerate(texts):
emb = embeddings[i]
# Check similarity with kept texts
is_duplicate = False
for kept_emb in kept_embeddings:
similarity = _cosine_similarity(emb, kept_emb)
if similarity >= similarity_threshold:
is_duplicate = True
break
if not is_duplicate:
result.append(text)
kept_embeddings.append(emb)
return result
[docs]
def deduplicate_lines(text: str, keep_order: bool = True) -> str:
"""Remove duplicate lines.
Args:
text: Input text
keep_order: Preserve line order
Returns:
Text with duplicate lines removed
Examples:
>>> deduplicate_lines("line1\\nline2\\nline1\\nline3")
'line1\\nline2\\nline3'
"""
if not text:
return text
lines = text.split("\n")
unique_lines = deduplicate_exact(lines, keep_order)
return "\n".join(unique_lines)
[docs]
def deduplicate_sentences(text: str, keep_order: bool = True) -> str:
"""Remove duplicate sentences.
Args:
text: Input text
keep_order: Preserve sentence order
Returns:
Text with duplicate sentences removed
Examples:
>>> deduplicate_sentences("Hello. World. Hello.")
'Hello. World.'
"""
if not text:
return text
# Simple sentence segmentation
sentences = re.split(r"(?<=[.!?])\s+(?=[A-Z])", text)
sentences = [s.strip() for s in sentences if s.strip()]
unique_sentences = deduplicate_exact(sentences, keep_order)
return " ".join(unique_sentences)
[docs]
def find_duplicates(
texts: List[str], mode: DeduplicationMode = DeduplicationMode.EXACT
) -> List[List[int]]:
"""Find duplicate texts without removing.
Args:
texts: List of texts
mode: Deduplication mode
Returns:
List of index groups representing duplicates
Examples:
>>> find_duplicates(["a", "b", "a", "c", "b"])
[[0, 2], [1, 4]]
"""
if not texts:
return []
if mode == DeduplicationMode.EXACT:
# Group by exact match
groups: Dict[str, List[int]] = {}
for i, text in enumerate(texts):
if text not in groups:
groups[text] = []
groups[text].append(i)
# Return only groups with duplicates
return [indices for indices in groups.values() if len(indices) > 1]
else:
# For fuzzy/semantic, use pairwise comparison
duplicate_groups = []
assigned = set()
for i in range(len(texts)):
if i in assigned:
continue
group = [i]
for j in range(i + 1, len(texts)):
if j in assigned:
continue
if mode == DeduplicationMode.FUZZY:
similarity = _simple_similarity(texts[i].lower(), texts[j].lower())
if similarity >= 0.9:
group.append(j)
assigned.add(j)
if len(group) > 1:
duplicate_groups.append(group)
assigned.update(group)
return duplicate_groups
[docs]
def compute_text_hash(text: str, algorithm: str = "md5") -> str:
"""Compute stable text hash for deduplication.
Args:
text: Input text
algorithm: Hash algorithm (md5, sha1, sha256)
Returns:
Hex hash string
Examples:
>>> hash1 = compute_text_hash("hello")
>>> hash2 = compute_text_hash("hello")
>>> hash1 == hash2
True
"""
if not text:
return ""
# Normalize text for consistent hashing
normalized = normalize_whitespace(text.strip().lower())
if algorithm == "md5":
return hashlib.md5(normalized.encode("utf-8")).hexdigest()
elif algorithm == "sha1":
return hashlib.sha1(normalized.encode("utf-8")).hexdigest()
elif algorithm == "sha256":
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
else:
raise ValueError(f"Unsupported hash algorithm: {algorithm}")
# ============================================================================
# Helper Functions
# ============================================================================
def _simple_similarity(text1: str, text2: str) -> float:
"""Calculate simple similarity between two texts."""
if not text1 or not text2:
return 0.0
# Character-based Jaccard similarity
set1 = set(text1.lower())
set2 = set(text2.lower())
intersection = len(set1 & set2)
union = len(set1 | set2)
if union == 0:
return 0.0
return intersection / union
def _cosine_similarity(vec1: List[float], vec2: List[float]) -> float:
"""Calculate cosine similarity between two vectors."""
if len(vec1) != len(vec2):
return 0.0
dot_product = sum(a * b for a, b in zip(vec1, vec2))
magnitude1 = sum(a * a for a in vec1) ** 0.5
magnitude2 = sum(b * b for b in vec2) ** 0.5
if magnitude1 == 0 or magnitude2 == 0:
return 0.0
return dot_product / (magnitude1 * magnitude2)