High-Performance JSONL Utilities
This document describes the enhanced JSONL utilities in the kerb.fine_tuning module, designed for efficiently handling large-scale training datasets.
Overview
The enhanced JSONL utilities provide:
Compression Support: Automatic gzip, bz2, and xz compression
Parallel Processing: Multi-core reading and transformation
Memory-Efficient Streaming: Process files larger than RAM
Advanced Filtering: Filter and transform on-the-fly
Deduplication: Remove duplicate examples efficiently
File Splitting: Break large files into manageable chunks
Validation: Parallel validation for large datasets
Statistics: Analyze dataset characteristics
Performance Benefits
1. Compression
Save 70-90% disk space with automatic compression:
from kerb.fine_tuning import jsonl
# Write with compression
jsonl.write_jsonl(data, "training_data.jsonl", compress=True, compression_type='gz')
# Read automatically detects compression
data = jsonl.read_jsonl("training_data.jsonl.gz")
Benefits:
5-10x smaller file sizes
Faster network transfers
Reduced storage costs
Automatic format detection
2. Parallel Reading
Speed up reading of large files with complex JSON:
# Up to 4x faster for large files
data = jsonl.parallel_read_jsonl("large_dataset.jsonl", num_workers=4)
When to Use:
Files > 100MB
Complex nested JSON structures
High CPU availability
Fast storage (SSD)
3. Streaming
Process files larger than available RAM:
# Process 100GB file with 8GB RAM
for batch in jsonl.stream_jsonl("huge_dataset.jsonl", batch_size=1000):
# Process each batch
processed = process_batch(batch)
save_results(processed)
Benefits:
Constant memory usage
Process unlimited file sizes
Early termination possible
Filter while reading
Feature Guide
Compression Support
All functions support compressed files automatically:
# Writing
jsonl.write_jsonl(data, "data.jsonl.gz", compress=True)
# Reading (auto-detects compression)
data = jsonl.read_jsonl("data.jsonl.gz")
# Streaming
for batch in jsonl.stream_jsonl("data.jsonl.bz2"):
process(batch)
# Compression types: 'gz', 'bz2', 'xz'
Filtering
Memory-efficient filtering:
# Filter while streaming
jsonl.filter_jsonl(
input_file="all_data.jsonl",
output_file="filtered_data.jsonl",
filter_fn=lambda x: x["quality_score"] > 0.8 and len(x["text"]) > 100,
compress_output=True
)
Transformation
Apply transformations efficiently:
def normalize_and_augment(item):
return {
"text": item["text"].lower().strip(),
"label": item["label"],
"length": len(item["text"]),
"has_numbers": any(c.isdigit() for c in item["text"])
}
# Sequential transformation
jsonl.transform_jsonl("input.jsonl", "output.jsonl", normalize_and_augment)
# Parallel transformation (faster for expensive operations)
jsonl.transform_jsonl(
"input.jsonl",
"output.jsonl",
normalize_and_augment,
parallel=True,
num_workers=8
)
File Splitting
Split large files into chunks:
# Split by line count
output_files = jsonl.split_jsonl(
"large_dataset.jsonl",
"chunk",
split_size=10000, # 10k lines per file
by_line=True
)
# Creates: chunk_0000.jsonl, chunk_0001.jsonl, ...
# Split by byte size (useful for distributed processing)
output_files = jsonl.split_jsonl(
"large_dataset.jsonl",
"chunk",
split_size=10_000_000, # 10MB per file
by_line=False,
compress_output=True
)
Deduplication
Remove duplicate training examples:
# Deduplicate by entire content
removed = jsonl.deduplicate_jsonl(
"data_with_dupes.jsonl",
"data_deduped.jsonl"
)
# Deduplicate by specific field
removed = jsonl.deduplicate_jsonl(
"data_with_dupes.jsonl",
"data_deduped.jsonl",
key_fn=lambda x: x["prompt"], # Only check prompt field
keep='first' # or 'last'
)
print(f"Removed {removed} duplicates")
Sampling
Create random subsets:
# Sample by count
jsonl.sample_jsonl(
"full_dataset.jsonl",
"sample_1000.jsonl",
n=1000,
random_state=42 # for reproducibility
)
# Sample by fraction
jsonl.sample_jsonl(
"full_dataset.jsonl",
"sample_10pct.jsonl",
fraction=0.1,
random_state=42
)
Merging
Combine multiple files:
# Sequential merge
jsonl.merge_jsonl(
["train_part1.jsonl", "train_part2.jsonl", "train_part3.jsonl"],
"train_full.jsonl"
)
# Parallel merge (faster for many files)
jsonl.merge_jsonl(
input_files,
"train_full.jsonl",
parallel=True,
compress_output=True
)
File Statistics
Analyze dataset characteristics:
stats = jsonl.get_jsonl_stats("dataset.jsonl", sample_size=1000)
print(f"Lines: {stats['total_lines']:,}")
print(f"Size: {stats['total_bytes']:,} bytes")
print(f"Avg bytes/line: {stats['avg_bytes_per_line']:.1f}")
print(f"Keys: {stats['keys']}")
print(f"Compressed: {stats['compressed']}")
Advanced Streaming
Stream with filtering and transformation:
# Combined filtering and transformation
for batch in jsonl.stream_jsonl(
"data.jsonl.gz",
batch_size=5000,
filter_fn=lambda x: x["lang"] == "en",
transform_fn=lambda x: {**x, "processed": True},
skip_invalid=True
):
# Process filtered and transformed batch
train_model(batch)
Performance Tips
1. Choose Appropriate Batch Sizes
# Small files or low memory
batch_size = 100
# Medium files
batch_size = 1000
# Large files with plenty of RAM
batch_size = 10000
2. Use Parallel Processing Wisely
Parallel processing helps when:
Files are large (>100MB)
JSON parsing is complex
Transformations are CPU-intensive
Skip parallel processing when:
Files are small (<10MB)
I/O is the bottleneck
Simple JSON structures
3. Compression Selection
# Fast compression, moderate ratio
compression_type='gz' # Default, good balance
# Better compression, slower
compression_type='bz2' # 5-10% better compression
# Best compression, slowest
compression_type='xz' # 10-20% better compression
4. Memory Management
# Process very large files with limited memory
for batch in jsonl.stream_jsonl("huge.jsonl", batch_size=100):
process_and_discard(batch)
# Batch is garbage collected after each iteration
# Avoid loading entire file
data = jsonl.read_jsonl("huge.jsonl") # ❌ May exhaust memory
# Instead, use streaming
for batch in jsonl.stream_jsonl("huge.jsonl"): # ✅ Constant memory
process(batch)
Common Workflows
Prepare Training Dataset
from kerb.fine_tuning import jsonl
# 1. Merge raw data files
jsonl.merge_jsonl(
["raw_data_1.jsonl", "raw_data_2.jsonl", "raw_data_3.jsonl"],
"merged.jsonl"
)
# 2. Deduplicate
jsonl.deduplicate_jsonl(
"merged.jsonl",
"deduped.jsonl",
key_fn=lambda x: x["prompt"]
)
# 3. Filter by quality
jsonl.filter_jsonl(
"deduped.jsonl",
"filtered.jsonl",
filter_fn=lambda x: x.get("quality_score", 0) > 0.7
)
# 4. Sample for validation set
jsonl.sample_jsonl(
"filtered.jsonl",
"validation.jsonl",
fraction=0.1,
random_state=42
)
# 5. Compress final files
jsonl.compress_jsonl("filtered.jsonl", "training.jsonl.gz")
jsonl.compress_jsonl("validation.jsonl", "validation.jsonl.gz")
Process Large Dataset
# Process 100GB dataset with 8GB RAM
def expensive_preprocessing(item):
# Tokenization, embedding, etc.
return processed_item
# Stream in batches, transform in parallel
for i, batch in enumerate(jsonl.stream_jsonl("huge_dataset.jsonl", batch_size=1000)):
# Transform batch in parallel
processed_batch = parallel_transform(batch, expensive_preprocessing)
# Save incrementally
jsonl.append_jsonl(processed_batch, "processed_dataset.jsonl.gz")
if i % 100 == 0:
print(f"Processed {i * 1000:,} examples")
Analyze Dataset Quality
# Get overview
stats = jsonl.get_jsonl_stats("dataset.jsonl")
print(f"Total examples: {stats['total_lines']:,}")
# Check for issues
validation = jsonl.validate_jsonl("dataset.jsonl", parallel=True)
if not validation.is_valid:
print("Errors found:")
for error in validation.errors:
print(f" - {error}")
# Analyze content
text_lengths = []
for batch in jsonl.stream_jsonl("dataset.jsonl", batch_size=1000):
for item in batch:
if "text" in item:
text_lengths.append(len(item["text"]))
print(f"Avg text length: {sum(text_lengths) / len(text_lengths):.0f}")
print(f"Min: {min(text_lengths)}, Max: {max(text_lengths)}")
Benchmarks
Performance improvements over basic implementation:
Operation |
Small (1K) |
Medium (100K) |
Large (1M) |
Huge (10M) |
|---|---|---|---|---|
Read |
1.0x |
2.5x |
3.8x |
4.2x |
Write (compressed) |
0.9x |
0.85x |
0.8x |
0.75x |
Filter |
1.0x |
1.2x |
1.5x |
1.8x |
Transform (parallel) |
1.0x |
2.0x |
3.5x |
3.8x |
Validation (parallel) |
1.0x |
2.2x |
3.0x |
3.5x |
Benchmarks on 8-core CPU with SSD storage
Migration Guide
From Basic Version
# Old way
data = jsonl.read_jsonl("data.jsonl")
# New way (same API, supports compression)
data = jsonl.read_jsonl("data.jsonl.gz")
# New way (faster for large files)
data = jsonl.parallel_read_jsonl("data.jsonl", num_workers=4)
# New way (memory-efficient for huge files)
for batch in jsonl.stream_jsonl("data.jsonl", batch_size=1000):
process(batch)
All existing code continues to work while gaining:
Automatic compression support
Better error handling
Optional performance features
Best Practices
Compress Training Data: Always compress large datasets
jsonl.write_jsonl(data, "data.jsonl", compress=True)
Stream Large Files: Don’t load huge files into memory
for batch in jsonl.stream_jsonl("huge.jsonl"): process(batch)
Validate Before Training: Check data quality
result = jsonl.validate_jsonl("data.jsonl", parallel=True) assert result.is_valid
Deduplicate: Remove duplicate examples
jsonl.deduplicate_jsonl("data.jsonl", "clean.jsonl")
Split for Distribution: Split large files for parallel processing
chunks = jsonl.split_jsonl("data.jsonl", "chunk", split_size=10000)