# src/leaderboard.py import pandas as pd from datasets import Dataset, load_dataset import json import datetime from typing import Dict, List, Optional, Tuple import os from config import LEADERBOARD_DATASET, HF_TOKEN, ALL_UG40_LANGUAGES, GOOGLE_SUPPORTED_LANGUAGES from src.utils import create_submission_id, sanitize_model_name, get_all_language_pairs, get_google_comparable_pairs def initialize_leaderboard() -> pd.DataFrame: """Initialize empty leaderboard DataFrame.""" columns = { 'submission_id': [], 'model_name': [], 'author': [], 'submission_date': [], 'model_type': [], 'description': [], # Primary metrics 'quality_score': [], 'bleu': [], 'chrf': [], # Secondary metrics 'rouge1': [], 'rouge2': [], 'rougeL': [], 'cer': [], 'wer': [], 'len_ratio': [], # Google comparable metrics 'google_quality_score': [], 'google_bleu': [], 'google_chrf': [], # Coverage info 'total_samples': [], 'language_pairs_covered': [], 'google_pairs_covered': [], 'coverage_rate': [], # Detailed results 'detailed_metrics': [], # JSON string 'validation_report': [], # Metadata 'evaluation_date': [], 'leaderboard_version': [] } return pd.DataFrame(columns) def load_leaderboard() -> pd.DataFrame: """Load current leaderboard from HuggingFace dataset.""" try: print("Loading leaderboard...") dataset = load_dataset(LEADERBOARD_DATASET, split='train') df = dataset.to_pandas() # Ensure all required columns exist required_columns = list(initialize_leaderboard().columns) for col in required_columns: if col not in df.columns: if col in ['quality_score', 'bleu', 'chrf', 'rouge1', 'rouge2', 'rougeL', 'cer', 'wer', 'len_ratio', 'google_quality_score', 'google_bleu', 'google_chrf', 'total_samples', 'language_pairs_covered', 'google_pairs_covered', 'coverage_rate']: df[col] = 0.0 elif col in ['leaderboard_version']: df[col] = 1 else: df[col] = '' print(f"Loaded leaderboard with {len(df)} entries") return df except Exception as e: print(f"Could not load leaderboard: {e}") print("Initializing empty leaderboard...") return initialize_leaderboard() def save_leaderboard(df: pd.DataFrame) -> bool: """Save leaderboard to HuggingFace dataset.""" try: # Clean data before saving df_clean = df.copy() # Ensure numeric columns are proper types numeric_columns = ['quality_score', 'bleu', 'chrf', 'rouge1', 'rouge2', 'rougeL', 'cer', 'wer', 'len_ratio', 'google_quality_score', 'google_bleu', 'google_chrf', 'total_samples', 'language_pairs_covered', 'google_pairs_covered', 'coverage_rate', 'leaderboard_version'] for col in numeric_columns: if col in df_clean.columns: df_clean[col] = pd.to_numeric(df_clean[col], errors='coerce').fillna(0.0) # Convert to dataset dataset = Dataset.from_pandas(df_clean) # Push to hub dataset.push_to_hub( LEADERBOARD_DATASET, token=HF_TOKEN, commit_message=f"Update leaderboard - {datetime.datetime.now().isoformat()[:19]}" ) print("Leaderboard saved successfully!") return True except Exception as e: print(f"Error saving leaderboard: {e}") return False def add_model_to_leaderboard( model_name: str, author: str, evaluation_results: Dict, validation_info: Dict, model_type: str = "", description: str = "" ) -> pd.DataFrame: """ Add new model results to leaderboard, with JSON-safe detailed_metrics. """ # Load current leaderboard df = load_leaderboard() # Remove existing entry if present existing_mask = df['model_name'] == model_name if existing_mask.any(): df = df[~existing_mask] # Safely serialize evaluation_results by dropping non-JSON types safe_results = evaluation_results.copy() # Remove sample_metrics DataFrame which isn't JSON serializable if 'sample_metrics' in safe_results: safe_results.pop('sample_metrics') detailed_json = json.dumps(safe_results) # Extract metrics averages = evaluation_results.get('averages', {}) google_averages = evaluation_results.get('google_comparable_averages', {}) summary = evaluation_results.get('summary', {}) # Prepare new entry new_entry = { 'submission_id': create_submission_id(), 'model_name': sanitize_model_name(model_name), 'author': author[:100] if author else 'Anonymous', 'submission_date': datetime.datetime.now().isoformat(), 'model_type': model_type[:50] if model_type else 'unknown', 'description': description[:500] if description else '', # Primary metrics 'quality_score': float(averages.get('quality_score', 0.0)), 'bleu': float(averages.get('bleu', 0.0)), 'chrf': float(averages.get('chrf', 0.0)), # Secondary metrics 'rouge1': float(averages.get('rouge1', 0.0)), 'rouge2': float(averages.get('rouge2', 0.0)), 'rougeL': float(averages.get('rougeL', 0.0)), 'cer': float(averages.get('cer', 0.0)), 'wer': float(averages.get('wer', 0.0)), 'len_ratio': float(averages.get('len_ratio', 0.0)), # Google comparable metrics 'google_quality_score': float(google_averages.get('quality_score', 0.0)), 'google_bleu': float(google_averages.get('bleu', 0.0)), 'google_chrf': float(google_averages.get('chrf', 0.0)), # Coverage info 'total_samples': int(summary.get('total_samples', 0)), 'language_pairs_covered': int(summary.get('language_pairs_covered', 0)), 'google_pairs_covered': int(summary.get('google_comparable_pairs', 0)), 'coverage_rate': float(validation_info.get('coverage', 0.0)), # Detailed results (JSON string) 'detailed_metrics': detailed_json, 'validation_report': validation_info.get('report', ''), # Metadata 'evaluation_date': datetime.datetime.now().isoformat(), 'leaderboard_version': 1 } # Convert to DataFrame and append new_row_df = pd.DataFrame([new_entry]) updated_df = pd.concat([df, new_row_df], ignore_index=True) updated_df = updated_df.sort_values('quality_score', ascending=False).reset_index(drop=True) # Save to hub save_leaderboard(updated_df) return updated_df def prepare_leaderboard_display(df: pd.DataFrame) -> pd.DataFrame: """Prepare leaderboard for display by formatting and selecting appropriate columns.""" if df.empty: return df # Select columns for display (exclude detailed_metrics and validation_report) display_columns = [ 'model_name', 'author', 'submission_date', 'model_type', 'quality_score', 'bleu', 'chrf', 'rouge1', 'rougeL', 'total_samples', 'language_pairs_covered', 'google_pairs_covered', 'coverage_rate' ] # Only include columns that exist available_columns = [col for col in display_columns if col in df.columns] display_df = df[available_columns].copy() # Format numeric columns numeric_format = { 'quality_score': '{:.4f}', 'bleu': '{:.2f}', 'chrf': '{:.4f}', 'rouge1': '{:.4f}', 'rougeL': '{:.4f}', 'coverage_rate': '{:.1%}', } for col, fmt in numeric_format.items(): if col in display_df.columns: display_df[col] = display_df[col].apply(lambda x: fmt.format(float(x)) if pd.notnull(x) else "0.0000") # Format submission date if 'submission_date' in display_df.columns: display_df['submission_date'] = pd.to_datetime(display_df['submission_date']).dt.strftime('%Y-%m-%d %H:%M') # Rename columns for better display column_renames = { 'model_name': 'Model Name', 'author': 'Author', 'submission_date': 'Submitted', 'model_type': 'Type', 'quality_score': 'Quality Score', 'bleu': 'BLEU', 'chrf': 'ChrF', 'rouge1': 'ROUGE-1', 'rougeL': 'ROUGE-L', 'total_samples': 'Samples', 'language_pairs_covered': 'Lang Pairs', 'google_pairs_covered': 'Google Pairs', 'coverage_rate': 'Coverage' } display_df = display_df.rename(columns=column_renames) return display_df def get_leaderboard_stats(df: pd.DataFrame) -> Dict: """Get summary statistics for the leaderboard.""" if df.empty: return { 'total_models': 0, 'avg_quality_score': 0.0, 'best_model': None, 'latest_submission': None, 'google_comparable_models': 0, 'coverage_distribution': {}, 'language_pair_coverage': {} } # Basic stats stats = { 'total_models': len(df), 'avg_quality_score': float(df['quality_score'].mean()), 'best_model': { 'name': df.iloc[0]['model_name'], 'score': float(df.iloc[0]['quality_score']), 'author': df.iloc[0]['author'] } if len(df) > 0 else None, 'latest_submission': df['submission_date'].max() if len(df) > 0 else None } # Google comparable models stats['google_comparable_models'] = int((df['google_pairs_covered'] > 0).sum()) # Coverage distribution coverage_bins = pd.cut(df['coverage_rate'], bins=[0, 0.5, 0.8, 0.95, 1.0], labels=['<50%', '50-80%', '80-95%', '95-100%']) stats['coverage_distribution'] = coverage_bins.value_counts().to_dict() # Language pair coverage if len(df) > 0: stats['avg_pairs_covered'] = float(df['language_pairs_covered'].mean()) stats['max_pairs_covered'] = int(df['language_pairs_covered'].max()) stats['total_possible_pairs'] = len(get_all_language_pairs()) return stats def filter_leaderboard( df: pd.DataFrame, search_query: str = "", model_type: str = "", min_coverage: float = 0.0, google_comparable_only: bool = False, top_n: int = None ) -> pd.DataFrame: """Filter leaderboard based on various criteria.""" filtered_df = df.copy() # Text search if search_query: query_lower = search_query.lower() mask = ( filtered_df['model_name'].str.lower().str.contains(query_lower, na=False) | filtered_df['author'].str.lower().str.contains(query_lower, na=False) | filtered_df['description'].str.lower().str.contains(query_lower, na=False) ) filtered_df = filtered_df[mask] # Model type filter if model_type and model_type != "all": filtered_df = filtered_df[filtered_df['model_type'] == model_type] # Coverage filter if min_coverage > 0: filtered_df = filtered_df[filtered_df['coverage_rate'] >= min_coverage] # Google comparable filter if google_comparable_only: filtered_df = filtered_df[filtered_df['google_pairs_covered'] > 0] # Top N filter if top_n: filtered_df = filtered_df.head(top_n) return filtered_df def get_model_comparison(df: pd.DataFrame, model_names: List[str]) -> Dict: """Get detailed comparison between specific models.""" models = df[df['model_name'].isin(model_names)] if len(models) == 0: return {'error': 'No models found'} comparison = { 'models': [], 'metrics_comparison': {}, 'detailed_results': {} } # Extract basic info for each model for _, model in models.iterrows(): comparison['models'].append({ 'name': model['model_name'], 'author': model['author'], 'submission_date': model['submission_date'], 'model_type': model['model_type'] }) # Parse detailed metrics if available try: detailed = json.loads(model['detailed_metrics']) comparison['detailed_results'][model['model_name']] = detailed except: comparison['detailed_results'][model['model_name']] = {} # Compare metrics metrics = ['quality_score', 'bleu', 'chrf', 'rouge1', 'rougeL', 'cer', 'wer'] for metric in metrics: if metric in models.columns: comparison['metrics_comparison'][metric] = { model_name: float(score) for model_name, score in zip(models['model_name'], models[metric]) } return comparison def export_leaderboard(df: pd.DataFrame, format: str = 'csv', include_detailed: bool = False) -> str: """Export leaderboard in specified format.""" timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") # Select columns for export if include_detailed: export_df = df.copy() else: basic_columns = [ 'model_name', 'author', 'submission_date', 'model_type', 'quality_score', 'bleu', 'chrf', 'rouge1', 'rougeL', 'total_samples', 'language_pairs_covered', 'coverage_rate' ] export_df = df[basic_columns].copy() if format == 'csv': filename = f"salt_leaderboard_{timestamp}.csv" export_df.to_csv(filename, index=False) elif format == 'json': filename = f"salt_leaderboard_{timestamp}.json" export_df.to_json(filename, orient='records', indent=2) elif format == 'xlsx': filename = f"salt_leaderboard_{timestamp}.xlsx" export_df.to_excel(filename, index=False) else: raise ValueError(f"Unsupported format: {format}") return filename def get_ranking_history(df: pd.DataFrame, model_name: str) -> Dict: """Get ranking history for a specific model (if multiple submissions).""" model_entries = df[df['model_name'] == model_name].sort_values('submission_date') if len(model_entries) == 0: return {'error': 'Model not found'} history = [] for _, entry in model_entries.iterrows(): # Calculate rank at time of submission submission_date = entry['submission_date'] historical_df = df[df['submission_date'] <= submission_date] rank = (historical_df['quality_score'] > entry['quality_score']).sum() + 1 history.append({ 'submission_date': submission_date, 'quality_score': float(entry['quality_score']), 'rank': int(rank), 'total_models': len(historical_df) }) return { 'model_name': model_name, 'history': history, 'current_rank': history[-1]['rank'] if history else None }