# src/evaluation.py import pandas as pd import numpy as np from sacrebleu.metrics import BLEU, CHRF from rouge_score import rouge_scorer import Levenshtein from collections import defaultdict from transformers.models.whisper.english_normalizer import BasicTextNormalizer from typing import Dict, List, Tuple from config import ALL_UG40_LANGUAGES, GOOGLE_SUPPORTED_LANGUAGES, METRICS_CONFIG from src.utils import get_all_language_pairs, get_google_comparable_pairs def calculate_sentence_metrics(reference: str, prediction: str) -> Dict[str, float]: """Calculate all metrics for a single sentence pair - Fixed to match reference implementation.""" # Handle empty predictions if not prediction or not isinstance(prediction, str): prediction = "" if not reference or not isinstance(reference, str): reference = "" # Normalize texts normalizer = BasicTextNormalizer() pred_norm = normalizer(prediction) ref_norm = normalizer(reference) metrics = {} # BLEU score (keep as 0-100 scale initially) try: bleu = BLEU(effective_order=True) metrics['bleu'] = bleu.sentence_score(pred_norm, [ref_norm]).score except: metrics['bleu'] = 0.0 # ChrF score (normalize to 0-1) try: chrf = CHRF() metrics['chrf'] = chrf.sentence_score(pred_norm, [ref_norm]).score / 100.0 except: metrics['chrf'] = 0.0 # Character Error Rate (CER) try: if len(ref_norm) > 0: metrics['cer'] = Levenshtein.distance(ref_norm, pred_norm) / len(ref_norm) else: metrics['cer'] = 1.0 if len(pred_norm) > 0 else 0.0 except: metrics['cer'] = 1.0 # Word Error Rate (WER) try: ref_words = ref_norm.split() pred_words = pred_norm.split() if len(ref_words) > 0: metrics['wer'] = Levenshtein.distance(ref_words, pred_words) / len(ref_words) else: metrics['wer'] = 1.0 if len(pred_words) > 0 else 0.0 except: metrics['wer'] = 1.0 # Length ratio try: if len(ref_norm) > 0: metrics['len_ratio'] = len(pred_norm) / len(ref_norm) else: metrics['len_ratio'] = 1.0 if len(pred_norm) == 0 else float('inf') except: metrics['len_ratio'] = 1.0 # ROUGE scores try: scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True) rouge_scores = scorer.score(ref_norm, pred_norm) metrics['rouge1'] = rouge_scores['rouge1'].fmeasure metrics['rouge2'] = rouge_scores['rouge2'].fmeasure metrics['rougeL'] = rouge_scores['rougeL'].fmeasure except: metrics['rouge1'] = 0.0 metrics['rouge2'] = 0.0 metrics['rougeL'] = 0.0 # Quality score (composite metric) - Fixed to match reference try: quality_components = [ metrics['bleu'] / 100.0, # Normalize BLEU to 0-1 metrics['chrf'], # Already 0-1 1.0 - min(metrics['cer'], 1.0), # Invert error rates 1.0 - min(metrics['wer'], 1.0), metrics['rouge1'], metrics['rougeL'] ] metrics['quality_score'] = np.mean(quality_components) except Exception as e: # Fallback without ROUGE print(f"Error calculating quality score: {e}") try: fallback_components = [ metrics['bleu'] / 100.0, metrics['chrf'], 1.0 - min(metrics['cer'], 1.0), 1.0 - min(metrics['wer'], 1.0) ] metrics['quality_score'] = np.mean(fallback_components) except: metrics['quality_score'] = 0.0 return metrics def evaluate_predictions(predictions: pd.DataFrame, test_set: pd.DataFrame) -> Dict: """Evaluate predictions against test set targets.""" print("Starting evaluation...") # Merge predictions with test set (which contains targets) merged = test_set.merge( predictions, on='sample_id', how='inner', suffixes=('', '_pred') ) if len(merged) == 0: return { 'error': 'No matching samples found between predictions and test set', 'evaluated_samples': 0 } print(f"Evaluating {len(merged)} samples...") # Calculate metrics for each sample sample_metrics = [] for idx, row in merged.iterrows(): metrics = calculate_sentence_metrics(row['target_text'], row['prediction']) metrics['sample_id'] = row['sample_id'] metrics['source_language'] = row['source_language'] metrics['target_language'] = row['target_language'] metrics['google_comparable'] = row.get('google_comparable', False) sample_metrics.append(metrics) sample_df = pd.DataFrame(sample_metrics) # Aggregate by language pairs - Fixed aggregation pair_metrics = {} overall_metrics = defaultdict(list) google_comparable_metrics = defaultdict(list) # Calculate metrics for each language pair for src_lang in ALL_UG40_LANGUAGES: for tgt_lang in ALL_UG40_LANGUAGES: if src_lang != tgt_lang: pair_data = sample_df[ (sample_df['source_language'] == src_lang) & (sample_df['target_language'] == tgt_lang) ] if len(pair_data) > 0: pair_key = f"{src_lang}_to_{tgt_lang}" pair_metrics[pair_key] = {} # Calculate averages for this pair for metric in METRICS_CONFIG['primary_metrics'] + METRICS_CONFIG['secondary_metrics']: if metric in pair_data.columns: # Filter out invalid values valid_values = pair_data[metric].replace([np.inf, -np.inf], np.nan).dropna() if len(valid_values) > 0: avg_value = float(valid_values.mean()) pair_metrics[pair_key][metric] = avg_value # Add to overall averages overall_metrics[metric].append(avg_value) # Add to Google comparable if applicable if (src_lang in GOOGLE_SUPPORTED_LANGUAGES and tgt_lang in GOOGLE_SUPPORTED_LANGUAGES): google_comparable_metrics[metric].append(avg_value) pair_metrics[pair_key]['sample_count'] = len(pair_data) # Calculate overall averages averages = {} for metric in overall_metrics: if overall_metrics[metric]: averages[metric] = float(np.mean(overall_metrics[metric])) else: averages[metric] = 0.0 # Calculate Google comparable averages google_averages = {} for metric in google_comparable_metrics: if google_comparable_metrics[metric]: google_averages[metric] = float(np.mean(google_comparable_metrics[metric])) else: google_averages[metric] = 0.0 # Generate evaluation summary summary = { 'total_samples': len(sample_df), 'language_pairs_covered': len([k for k in pair_metrics if pair_metrics[k].get('sample_count', 0) > 0]), 'google_comparable_pairs': len([k for k in pair_metrics if '_to_' in k and k.split('_to_')[0] in GOOGLE_SUPPORTED_LANGUAGES and k.split('_to_')[1] in GOOGLE_SUPPORTED_LANGUAGES and pair_metrics[k].get('sample_count', 0) > 0]), 'primary_metrics': {metric: averages.get(metric, 0.0) for metric in METRICS_CONFIG['primary_metrics']}, 'secondary_metrics': {metric: averages.get(metric, 0.0) for metric in METRICS_CONFIG['secondary_metrics']} } return { 'sample_metrics': sample_df, 'pair_metrics': pair_metrics, 'averages': averages, 'google_comparable_averages': google_averages, 'summary': summary, 'evaluated_samples': len(sample_df), 'error': None } # Keep the rest of the functions unchanged... def compare_with_baseline(results: Dict, baseline_results: Dict = None) -> Dict: """Compare results with baseline (e.g., Google Translate).""" if baseline_results is None: return { 'comparison_available': False, 'message': 'No baseline available for comparison' } comparison = { 'comparison_available': True, 'overall_comparison': {}, 'pair_comparisons': {}, 'better_pairs': [], 'worse_pairs': [] } # Compare overall metrics for metric in METRICS_CONFIG['primary_metrics']: if metric in results['averages'] and metric in baseline_results['averages']: user_score = results['averages'][metric] baseline_score = baseline_results['averages'][metric] # For error metrics (cer, wer), lower is better if metric in ['cer', 'wer']: improvement = baseline_score - user_score # Positive = improvement else: improvement = user_score - baseline_score # Positive = improvement comparison['overall_comparison'][metric] = { 'user_score': user_score, 'baseline_score': baseline_score, 'improvement': improvement, 'improvement_percent': (improvement / max(baseline_score, 0.001)) * 100 } # Compare by language pairs (only Google comparable ones) google_pairs = [k for k in results['pair_metrics'] if '_to_' in k and k.split('_to_')[0] in GOOGLE_SUPPORTED_LANGUAGES and k.split('_to_')[1] in GOOGLE_SUPPORTED_LANGUAGES] for pair in google_pairs: if pair in baseline_results['pair_metrics']: pair_comparison = {} for metric in METRICS_CONFIG['primary_metrics']: if (metric in results['pair_metrics'][pair] and metric in baseline_results['pair_metrics'][pair]): user_score = results['pair_metrics'][pair][metric] baseline_score = baseline_results['pair_metrics'][pair][metric] if metric in ['cer', 'wer']: improvement = baseline_score - user_score else: improvement = user_score - baseline_score pair_comparison[metric] = { 'user_score': user_score, 'baseline_score': baseline_score, 'improvement': improvement } comparison['pair_comparisons'][pair] = pair_comparison # Determine if this pair is better or worse overall quality_improvement = pair_comparison.get('quality_score', {}).get('improvement', 0) if quality_improvement > 0.01: # Threshold for significance comparison['better_pairs'].append(pair) elif quality_improvement < -0.01: comparison['worse_pairs'].append(pair) return comparison def generate_evaluation_report(results: Dict, model_name: str = "", comparison: Dict = None) -> str: """Generate human-readable evaluation report.""" if results.get('error'): return f"❌ **Evaluation Error**: {results['error']}" report = [] # Header report.append(f"## Evaluation Report: {model_name or 'Submission'}") report.append("") # Summary summary = results['summary'] report.append("### 📊 Summary") report.append(f"- **Total Samples Evaluated**: {summary['total_samples']:,}") report.append(f"- **Language Pairs Covered**: {summary['language_pairs_covered']}") report.append(f"- **Google Comparable Pairs**: {summary['google_comparable_pairs']}") report.append("") # Primary metrics report.append("### 🎯 Primary Metrics") for metric, value in summary['primary_metrics'].items(): formatted_value = f"{value:.4f}" if metric != 'bleu' else f"{value:.2f}" report.append(f"- **{metric.upper()}**: {formatted_value}") # Quality ranking (if comparison available) if comparison and comparison.get('comparison_available'): quality_comp = comparison['overall_comparison'].get('quality_score', {}) if quality_comp: improvement = quality_comp.get('improvement', 0) if improvement > 0.01: report.append(f" - 🟢 **{improvement:.3f}** better than baseline") elif improvement < -0.01: report.append(f" - 🔴 **{abs(improvement):.3f}** worse than baseline") else: report.append(f" - 🟡 Similar to baseline") report.append("") # Secondary metrics report.append("### 📈 Secondary Metrics") for metric, value in summary['secondary_metrics'].items(): formatted_value = f"{value:.4f}" report.append(f"- **{metric.upper()}**: {formatted_value}") report.append("") # Language pair performance (top and bottom 5) pair_metrics = results['pair_metrics'] if pair_metrics: # Sort pairs by quality score sorted_pairs = sorted( [(k, v.get('quality_score', 0)) for k, v in pair_metrics.items() if v.get('sample_count', 0) > 0], key=lambda x: x[1], reverse=True ) if sorted_pairs: report.append("### 🏆 Best Performing Language Pairs") for pair, score in sorted_pairs[:5]: src, tgt = pair.replace('_to_', ' → ').split(' → ') report.append(f"- **{src} → {tgt}**: {score:.3f}") if len(sorted_pairs) > 5: report.append("") report.append("### 📉 Challenging Language Pairs") for pair, score in sorted_pairs[-3:]: src, tgt = pair.replace('_to_', ' → ').split(' → ') report.append(f"- **{src} → {tgt}**: {score:.3f}") # Comparison with baseline if comparison and comparison.get('comparison_available'): report.append("") report.append("### 🔍 Comparison with Baseline") better_count = len(comparison.get('better_pairs', [])) worse_count = len(comparison.get('worse_pairs', [])) total_comparable = len(comparison.get('pair_comparisons', {})) if total_comparable > 0: report.append(f"- **Better than baseline**: {better_count}/{total_comparable} pairs") report.append(f"- **Worse than baseline**: {worse_count}/{total_comparable} pairs") if comparison['better_pairs']: report.append(" - Strong pairs: " + ", ".join(comparison['better_pairs'][:3])) if comparison['worse_pairs']: report.append(" - Weak pairs: " + ", ".join(comparison['worse_pairs'][:3])) return "\n".join(report) def create_sample_analysis(results: Dict, n_samples: int = 10) -> pd.DataFrame: """Create sample analysis showing best and worst translations.""" if 'sample_metrics' not in results: return pd.DataFrame() sample_df = results['sample_metrics'] # Get best and worst samples by quality score best_samples = sample_df.nlargest(n_samples // 2, 'quality_score') worst_samples = sample_df.nsmallest(n_samples // 2, 'quality_score') analysis_samples = pd.concat([best_samples, worst_samples]) # Add category analysis_samples['category'] = ['Best'] * len(best_samples) + ['Worst'] * len(worst_samples) return analysis_samples[['sample_id', 'source_language', 'target_language', 'quality_score', 'bleu', 'chrf', 'category']] def get_google_translate_baseline() -> Dict: """Get Google Translate baseline results (if available).""" try: # This would load pre-computed Google Translate results # For now, return empty dict - implement when Google Translate baseline is available return {} except: return {}