Source code for kerb.parsing.validation

"""Output validation and fixing utilities.

This module provides comprehensive validation of LLM outputs and retry mechanisms
with progressive fixes.
"""

from typing import Any, Callable, Dict, Optional, Type

from .code import extract_code_blocks
from .json import (extract_json, extract_json_array, extract_json_object,
                   fix_json)
from .schema import validate_json_schema
from .types import ParseMode, ParseResult, ValidationResult


[docs] def validate_output( text: str, output_type: str, schema: Optional[Dict[str, Any]] = None, model_class: Optional[Type] = None, custom_validator: Optional[Callable[[Any], bool]] = None, ) -> ValidationResult: """Validate LLM output against expected format. Args: text (str): LLM output text output_type (str): Expected type ('json', 'json_array', 'json_object', 'pydantic', 'code', etc.) schema (Dict, optional): JSON Schema for validation model_class (Type, optional): Pydantic model class for validation custom_validator (Callable, optional): Custom validation function Returns: ValidationResult: Validation result with errors/warnings """ errors = [] warnings = [] data = None # Parse based on type if output_type == "json": result = extract_json(text) if not result.success: errors.append(result.error) else: data = result.data warnings.extend(result.warnings) elif output_type == "json_array": result = extract_json_array(text) if not result.success: errors.append(result.error) else: data = result.data warnings.extend(result.warnings) elif output_type == "json_object": result = extract_json_object(text) if not result.success: errors.append(result.error) else: data = result.data warnings.extend(result.warnings) elif output_type == "pydantic": if not model_class: errors.append("model_class required for pydantic validation") else: from .pydantic import parse_to_pydantic result = parse_to_pydantic(text, model_class) if not result.success: errors.append(result.error) else: data = result.data warnings.extend(result.warnings) elif output_type == "code": blocks = extract_code_blocks(text) if not blocks: errors.append("No code blocks found in output") else: data = blocks else: errors.append(f"Unknown output_type: {output_type}") # Schema validation if schema and data and not errors: schema_result = validate_json_schema( data if not hasattr(data, "model_dump") else data.model_dump(), schema ) if not schema_result.valid: errors.extend(schema_result.errors) # Custom validation if custom_validator and data and not errors: try: is_valid = custom_validator(data) if not is_valid: errors.append("Custom validation failed") except Exception as e: errors.append(f"Custom validation error: {str(e)}") return ValidationResult( valid=len(errors) == 0, errors=errors, warnings=warnings, data=data )
[docs] def retry_parse_with_fixes( text: str, parser_func: Callable[[str], ParseResult], max_attempts: int = 3 ) -> ParseResult: """Retry parsing with increasingly aggressive fixes. Args: text (str): Text to parse parser_func (Callable): Parser function to use max_attempts (int): Maximum retry attempts Returns: ParseResult: Final parse result """ modes = [ParseMode.STRICT, ParseMode.LENIENT, ParseMode.BEST_EFFORT] for i, mode in enumerate(modes[:max_attempts]): # If parser supports mode parameter try: result = parser_func(text, mode=mode) if result.success: return result except TypeError: # Parser doesn't support mode parameter result = parser_func(text) if result.success: return result # If not last attempt, try fixing if i < max_attempts - 1: if isinstance(text, str) and ("{" in text or "[" in text): fix_result = fix_json(text) if fix_result.success: return fix_result return result