AppleSwing's picture
update
5ae48b5
import pandas as pd
from huggingface_hub import snapshot_download
import subprocess
import re
import os
import GPUtil
from transformers import AutoConfig
from typing import List
try:
from src.display.utils import GPU_TEMP, GPU_Mem, GPU_Power, GPU_Util, GPU_Name
except:
print("local debug: from display.utils")
from display.utils import GPU_TEMP, GPU_Mem, GPU_Power, GPU_Util, GPU_Name
MEM_BW_DICT ={
"NVIDIA-A100-PCIe-80GB": 1935e9,
"NVIDIA-A100-SXM4-80GB": 2039e9,
"NVIDIA-H100-PCIe-80GB": 2039e9,
"NVIDIA-RTX-A5000-24GB": 768e9,
"NVIDIA-RTX-A6000-48GB": 768e9,
}
PEAK_FLOPS_DICT = {
"float32":{
"NVIDIA-A100-PCIe-80GB": 312e12,
"NVIDIA-A100-SXM4-80GB": 312e12,
"NVIDIA-H100-PCIe-80GB": 756e12,
"NVIDIA-RTX-A5000-24GB": 222.2e12,
"NVIDIA-RTX-A6000-48GB": 309.7e12
},
"float16":{
"NVIDIA-A100-PCIe-80GB": 624e12,
"NVIDIA-A100-SXM4-80GB": 624e12,
"NVIDIA-H100-PCIe-80GB": 1513e12,
"NVIDIA-RTX-A5000-24GB": 222.2e12,
"NVIDIA-RTX-A6000-48GB": 309.7e12
},
"bfloat16":{
"NVIDIA-A100-PCIe-80GB": 624e12,
"NVIDIA-A100-SXM4-80GB": 624e12,
"NVIDIA-H100-PCIe-80GB": 1513e12,
"NVIDIA-RTX-A5000-24GB": 222.2e12,
"NVIDIA-RTX-A6000-48GB": 309.7e12
},
"int8":{
"NVIDIA-A100-PCIe-80GB": 1248e12,
"NVIDIA-A100-SXM4-80GB": 1248e12,
"NVIDIA-H100-PCIe-80GB": 3026e12,
"NVIDIA-RTX-A5000-24GB": 222.2e12,
"NVIDIA-RTX-A6000-48GB": 309.7e12
},
"fp8":{
"NVIDIA-A100-PCIe-80GB": 1248e12,
"NVIDIA-A100-SXM4-80GB": 1248e12,
"NVIDIA-H100-PCIe-80GB": 3026e12,
"NVIDIA-RTX-A5000-24GB": 0,
"NVIDIA-RTX-A6000-48GB": 0
},
"fp4": {
"NVIDIA-A100-PCIe-80GB": 1248e12,
"NVIDIA-A100-SXM4-80GB": 1248e12,
"NVIDIA-H100-PCIe-80GB": 3026e12,
"NVIDIA-RTX-A5000-24GB": 0,
"NVIDIA-RTX-A6000-48GB": 0
},
"int4": {
"NVIDIA-A100-PCIe-80GB": 1248e12,
"NVIDIA-A100-SXM4-80GB": 1248e12,
"NVIDIA-H100-PCIe-80GB": 3026e12,
"NVIDIA-RTX-A5000-24GB": 222.2e12,
"NVIDIA-RTX-A6000-48GB": 309.7e12
}
}
def my_snapshot_download(repo_id, revision, local_dir, repo_type, max_workers):
for i in range(10):
try:
snapshot_download(
repo_id=repo_id, revision=revision, local_dir=local_dir, repo_type=repo_type, max_workers=max_workers
)
return
except Exception as e:
print(f"Failed to download {repo_id} at {revision} with error: {e}. Retrying...")
import time
time.sleep(60)
return
def get_dataset_url(row):
dataset_name = row["Benchmark"]
dataset_url = row["Dataset Link"]
benchmark = f'<a target="_blank" href="{dataset_url}" style="color: var(--link-text-color); text-decoration: underline;text-decoration-style: dotted;">{dataset_name}</a>'
return benchmark
def get_dataset_summary_table(file_path):
df = pd.read_csv(file_path)
df["Benchmark"] = df.apply(lambda x: get_dataset_url(x), axis=1)
df = df[["Category", "Benchmark", "Data Split", "Data Size", "Language"]]
return df
def parse_nvidia_smi():
visible_devices = os.getenv('CUDA_VISIBLE_DEVICES', None)
if visible_devices is not None:
gpu_indices = visible_devices.split(',')
else:
# Query all GPU indices if CUDA_VISIBLE_DEVICES is not set
result = subprocess.run(['nvidia-smi', '--query-gpu=index', '--format=csv,noheader'], capture_output=True, text=True)
if result.returncode != 0:
print("Failed to query GPU indices.")
return []
gpu_indices = result.stdout.strip().split('\n')
# print(f"gpu_indices: {gpu_indices}")
gpu_stats = []
gpu_info_pattern = re.compile(r'(\d+)C\s+P\d+\s+(\d+)W\s*/\s*\d+W\s*\|\s*(\d+)MiB\s*/\s*\d+MiB\s*\|\s*(\d+)%')
# gpu_name_pattern = re.compile(r'NVIDIA\s+([\w\s]+\d+(?:\s*GB)?)')
gpu_name_pattern = re.compile(r'NVIDIA\s+(RTX\s+)?([A-Z0-9]+)')
gpu_name = ""
for index in gpu_indices:
result = subprocess.run(['nvidia-smi', '-i', index], capture_output=True, text=True)
output = result.stdout.strip()
lines = output.split("\n")
for line in lines:
match = gpu_info_pattern.search(line)
name_match = gpu_name_pattern.search(line)
gpu_info = {}
if name_match:
gpu_name = ''.join(filter(None, name_match.groups())).strip()
if match:
temp, power_usage, mem_usage, gpu_util = map(int, match.groups())
gpu_info.update({
GPU_TEMP: temp,
GPU_Power: power_usage,
GPU_Mem: round(mem_usage / 1024, 2),
GPU_Util: gpu_util
})
if len(gpu_info) >= 4:
gpu_stats.append(gpu_info)
# print(f"gpu_stats: {gpu_stats}")
gpu_name = f"{len(gpu_stats)}x{gpu_name}"
gpu_stats_total = {
GPU_TEMP: 0,
GPU_Power: 0,
GPU_Mem: 0,
GPU_Util: 0,
GPU_Name: gpu_name
}
for gpu_stat in gpu_stats:
gpu_stats_total[GPU_TEMP] += gpu_stat[GPU_TEMP]
gpu_stats_total[GPU_Power] += gpu_stat[GPU_Power]
gpu_stats_total[GPU_Mem] += gpu_stat[GPU_Mem]
gpu_stats_total[GPU_Util] += gpu_stat[GPU_Util]
gpu_stats_total[GPU_Mem] = gpu_stats_total[GPU_Mem] # G
gpu_stats_total[GPU_TEMP] /= len(gpu_stats)
gpu_stats_total[GPU_Power] /= len(gpu_stats)
gpu_stats_total[GPU_Util] /= len(gpu_stats)
return [gpu_stats_total]
def monitor_gpus(stop_event, interval, stats_list):
while not stop_event.is_set():
gpu_stats = parse_nvidia_smi()
if gpu_stats:
stats_list.extend(gpu_stats)
stop_event.wait(interval)
def analyze_gpu_stats(stats_list):
# Check if the stats_list is empty, and return None if it is
if not stats_list:
return None
# Initialize dictionaries to store the stats
avg_stats = {}
max_stats = {}
# Calculate average stats, excluding 'GPU_Mem'
for key in stats_list[0].keys():
if key != GPU_Mem and key != GPU_Name:
total = sum(d[key] for d in stats_list)
avg_stats[key] = total / len(stats_list)
# Calculate max stats for 'GPU_Mem'
max_stats[GPU_Mem] = max(d[GPU_Mem] for d in stats_list)
if GPU_Name in stats_list[0]:
avg_stats[GPU_Name] = stats_list[0][GPU_Name]
# Update average stats with max GPU memory usage
avg_stats.update(max_stats)
return avg_stats
def get_gpu_details():
gpus = GPUtil.getGPUs()
gpu = gpus[0]
name = gpu.name.replace(" ", "-")
memory_gb = round(gpu.memoryTotal / 1024)
memory = f"{memory_gb}GB"
for part in name.split('-'):
if part.endswith("GB") and part[:-2].isdigit():
name = name.replace(f"-{part}", "").replace(part, "")
formatted_name = f"{name}-{memory}"
return formatted_name
def get_peak_bw(gpu_name):
return MEM_BW_DICT[gpu_name]
def get_peak_flops(gpu_name, precision):
return PEAK_FLOPS_DICT[precision][gpu_name]
def _calculate_batch_metrics(outputs, decoding_tp, n_layers, d_model,
n_attn_heads, d_head, n_kv_heads, n_experts_per_tok, d_ff,
avg_activated_experts, hf_config, num_gpus, model_name,
used_dtype, batch_size, precision):
"""Calculate metrics for a batch of outputs"""
gpu_type = get_gpu_details()
hardware_specs = {
"peak_bandwidth_tb": get_peak_bw(gpu_type) / 1e12,
"peak_flops_tf": get_peak_flops(gpu_type, precision=used_dtype) / 1e12,
}
kvs = []
true_kvs = []
attn_score = []
# Calculate KV sizes
per_token_kv_size = 2 * n_layers * d_head * n_kv_heads # Default calculation
if "DeepSeek" in model_name:
if hasattr(hf_config, "kv_lora_rank") and hasattr(hf_config, "qk_rope_head_dim"):
per_token_kv_size = n_layers * (hf_config.kv_lora_rank + hf_config.qk_rope_head_dim)
# Process each output
for x in outputs:
output_len = len(x.outputs[0].token_ids)
context_prefill_size = len(x.prompt_token_ids)
# Calculate attention scores
if "DeepSeek" in model_name and hasattr(hf_config, "qk_rope_head_dim") and hasattr(hf_config, "qk_nope_head_dim") and hasattr(hf_config, "v_head_dim"):
q_head_dim = hf_config.qk_rope_head_dim + hf_config.qk_nope_head_dim
origin_per_token_k_state_size = n_layers * n_attn_heads * q_head_dim
origin_per_token_v_state_size = n_layers * n_attn_heads * hf_config.v_head_dim
attention_score = context_prefill_size * origin_per_token_k_state_size + (output_len - 1) * origin_per_token_k_state_size / 2
attention_score += context_prefill_size * origin_per_token_v_state_size + (output_len - 1) * origin_per_token_v_state_size / 2
attention_score = attention_score / 1e12
else:
origin_per_token_kv_states_size = n_layers * n_attn_heads * d_head
attention_score = context_prefill_size * origin_per_token_kv_states_size + (output_len - 1) * origin_per_token_kv_states_size / 2
attention_score = attention_score * 2 / 1e12
# Store attention scores and KV sizes
attn_score.append(attention_score)
kv_size = context_prefill_size * per_token_kv_size + (output_len - 1) * per_token_kv_size / 2
kv_size = kv_size / 1e12
true_kv = (context_prefill_size * per_token_kv_size + output_len * per_token_kv_size) / 1e12 * 1e3
kvs.append(kv_size)
true_kvs.append(true_kv)
# Calculate aggregate values
kv_size = sum(kvs)
true_kv_size = sum(true_kvs) * 1e3
attention_score = sum(attn_score) / len(attn_score)
# Calculate attention size per token
if "DeepSeek" in model_name and hasattr(hf_config, "qk_rope_head_dim") and hasattr(hf_config, "qk_nope_head_dim") and hasattr(hf_config, "v_head_dim") and hasattr(hf_config, "kv_lora_rank"):
q_head_dim = hf_config.qk_rope_head_dim + hf_config.qk_nope_head_dim
if not hasattr(hf_config, "q_lora_rank") or not hf_config.q_lora_rank:
attention_size_per_token = (d_model * n_attn_heads * q_head_dim) + \
(d_model * (hf_config.kv_lora_rank + hf_config.qk_rope_head_dim)) + \
(hf_config.kv_lora_rank * n_attn_heads * (q_head_dim - hf_config.qk_rope_head_dim + hf_config.v_head_dim)) + \
(hf_config.v_head_dim * n_attn_heads * d_model)
attention_size_per_token = attention_size_per_token / 1e12
else:
attention_size_per_token = (d_model * hf_config.q_lora_rank) + \
(hf_config.q_lora_rank * n_attn_heads * q_head_dim) + \
(d_model * (hf_config.kv_lora_rank + hf_config.qk_rope_head_dim)) + \
(hf_config.kv_lora_rank * n_attn_heads * (q_head_dim - hf_config.qk_rope_head_dim + hf_config.v_head_dim)) + \
(hf_config.v_head_dim * n_attn_heads * d_model)
attention_size_per_token = attention_size_per_token / 1e12
else:
attention_size_per_token = d_model * (n_attn_heads * d_head + n_kv_heads * d_head * 2) + n_attn_heads * d_head * d_model
attention_size_per_token = attention_size_per_token / 1e12
# Calculate expert sizes
expert_size = d_ff * 3 * d_model / 1e12
shared_experts_size_total = 0
deepseek_dense_ffn_size = 0
deepseek_sparse_layer_num = 0
if "Qwen" in model_name and hasattr(hf_config, "moe_intermediate_size") and hasattr(hf_config, "shared_expert_intermediate_size"):
d_ff = hf_config.moe_intermediate_size
d_ff_share = hf_config.shared_expert_intermediate_size
shared_experts_size = d_ff_share * 3 * d_model
expert_size = d_ff * 3 * d_model
shared_experts_size_total = shared_experts_size / 1e12
expert_size = expert_size / 1e12
elif "Qwen3" in model_name and hasattr(hf_config, "moe_intermediate_size"):
d_ff = hf_config.moe_intermediate_size
expert_size = d_ff * 3 * d_model
expert_size = expert_size / 1e12
elif "DeepSeek" in model_name and hasattr(hf_config, "moe_intermediate_size") and hasattr(hf_config, "intermediate_size") and hasattr(hf_config, "first_k_dense_replace"):
d_ff = hf_config.moe_intermediate_size
d_ff_dense = hf_config.intermediate_size
deepseek_num_dense_layer = hf_config.first_k_dense_replace
shared_experts_size = d_ff * 3 * d_model
expert_size = d_ff * 3 * d_model
shared_experts = 2
shared_experts_size_total = shared_experts_size * shared_experts / 1e12
expert_size = expert_size / 1e12
deepseek_sparse_layer_num = n_layers - deepseek_num_dense_layer
deepseek_dense_ffn_size = d_ff_dense * 3 * d_model / 1e12
# Calculate S-MBU and S-MFU
if "Qwen" in model_name and not "Qwen3" in model_name:
smbu = ((n_layers*(avg_activated_experts * expert_size + shared_experts_size_total + attention_size_per_token) +
kv_size) * precision/ (batch_size / decoding_tp)) / (num_gpus * hardware_specs['peak_bandwidth_tb'])
smfu = (n_layers * (attention_size_per_token + n_experts_per_tok * expert_size + shared_experts_size_total) + attention_score) \
* 2 * decoding_tp / (num_gpus * hardware_specs['peak_flops_tf'] / 2)
elif "Qwen3" in model_name:
smbu = ((n_layers * (avg_activated_experts * expert_size + attention_size_per_token) +
kv_size) * precision/ (batch_size / decoding_tp)) / (num_gpus * hardware_specs['peak_bandwidth_tb'])
smfu = (n_layers * (attention_size_per_token + n_experts_per_tok * expert_size) + attention_score) \
* 2 * decoding_tp / (num_gpus * hardware_specs['peak_flops_tf'] / 2)
elif "DeepSeek" in model_name:
smbu = ((n_layers * attention_size_per_token + deepseek_sparse_layer_num * \
(avg_activated_experts * expert_size + shared_experts_size_total) + \
deepseek_num_dense_layer * deepseek_dense_ffn_size + \
kv_size) * precision/ (batch_size / decoding_tp)) / (num_gpus * hardware_specs['peak_bandwidth_tb'])
smfu = (n_layers * attention_size_per_token + deepseek_sparse_layer_num * \
(n_experts_per_tok * expert_size + shared_experts_size_total) + \
deepseek_num_dense_layer * deepseek_dense_ffn_size + attention_score) \
* 2 * decoding_tp / (num_gpus * hardware_specs['peak_flops_tf'] / 2)
else:
smbu = ((n_layers*(avg_activated_experts * expert_size + attention_size_per_token) +
kv_size) * precision/ (batch_size / decoding_tp) ) / (num_gpus * hardware_specs['peak_bandwidth_tb'])
smfu = (n_layers * (attention_size_per_token + n_experts_per_tok * expert_size) + attention_score) \
* 2 * decoding_tp / (num_gpus * hardware_specs['peak_flops_tf'] / 2)
return {
'smbu': smbu,
'smfu': smfu,
'kv_size': true_kv_size,
'decoding_throughput': decoding_tp
}
def _calculate_batch_metrics_sglang(outputs, decoding_tp, n_layers, d_model,
n_attn_heads, d_head, n_kv_heads, n_experts_per_tok, d_ff,
avg_activated_experts, hf_config, num_gpus, model_name,
used_dtype, batch_size, precision, ttft=None, prefill_tp=None):
"""Calculate metrics for a batch of outputs"""
# Initialize hardware specs and output lists
hardware_specs = _get_hardware_specs(used_dtype)
output_data = _extract_output_data(outputs)
# Calculate model-specific sizes
per_token_kv_size = _calculate_kv_size(model_name, hf_config, n_layers, d_head, n_kv_heads)
attention_size_per_token = _calculate_attention_size(model_name, hf_config, d_model, n_attn_heads, d_head, n_kv_heads)
expert_config = _calculate_expert_config(model_name, hf_config, d_ff, d_model, n_layers)
# Process outputs and calculate metrics
metrics_data = _process_outputs(output_data, per_token_kv_size, attention_size_per_token,
model_name, hf_config, n_layers, n_attn_heads, d_head)
# Calculate throughput metrics
if ttft is None or prefill_tp is None:
ttft, prefill_tp = _calculate_throughput_metrics(batch_size, output_data['prefill_lengths'],
output_data['max_duration'])
# Calculate S-MBU and S-MFU
smbu_smfu_metrics = _calculate_smbu_smfu(model_name, n_layers, attention_size_per_token,
expert_config, avg_activated_experts, metrics_data,
hardware_specs, num_gpus, precision, ttft, prefill_tp,
batch_size, decoding_tp)
return {
'prefill_smbu': smbu_smfu_metrics['prefill_smbu'],
'prefill_smfu': smbu_smfu_metrics['prefill_smfu'],
'decoding_smbu': smbu_smfu_metrics['decoding_smbu'],
'decoding_smfu': smbu_smfu_metrics['decoding_smfu'],
'kv_size': metrics_data['true_kv_size'],
'decoding_throughput': decoding_tp,
'prefill_tp': prefill_tp,
'ttft': ttft
}
def _get_hardware_specs(used_dtype):
"""Get hardware specifications"""
gpu_type = get_gpu_details()
return {
"peak_bandwidth_tb": get_peak_bw(gpu_type) / 1e12,
"peak_flops_tf": get_peak_flops(gpu_type, precision=used_dtype) / 1e12,
}
def _extract_output_data(outputs):
"""Extract relevant data from outputs"""
prefill_lengths = []
output_lengths = []
max_duration = 0.0
for x in outputs:
output_lengths.append(x['meta_info']['completion_tokens'])
prefill_lengths.append(x['meta_info']['prompt_tokens'])
max_duration = max(max_duration, x['meta_info']['e2e_latency'])
return {
'prefill_lengths': prefill_lengths,
'output_lengths': output_lengths,
'max_duration': max_duration
}
def _calculate_kv_size(model_name, hf_config, n_layers, d_head, n_kv_heads):
"""Calculate per-token KV size based on model type"""
if "DeepSeek" in model_name and hasattr(hf_config, "kv_lora_rank") and hasattr(hf_config, "qk_rope_head_dim"):
return n_layers * (hf_config.kv_lora_rank + hf_config.qk_rope_head_dim)
return 2 * n_layers * d_head * n_kv_heads
def _calculate_attention_size(model_name, hf_config, d_model, n_attn_heads, d_head, n_kv_heads):
"""Calculate attention size per token based on model type"""
if ("DeepSeek" in model_name and
hasattr(hf_config, "qk_rope_head_dim") and
hasattr(hf_config, "qk_nope_head_dim") and
hasattr(hf_config, "v_head_dim") and
hasattr(hf_config, "kv_lora_rank")):
return _calculate_deepseek_attention_size(hf_config, d_model, n_attn_heads)
return (d_model * (n_attn_heads * d_head + n_kv_heads * d_head * 2) +
n_attn_heads * d_head * d_model) / 1e12
def _calculate_deepseek_attention_size(hf_config, d_model, n_attn_heads):
"""Calculate DeepSeek-specific attention size"""
q_head_dim = hf_config.qk_rope_head_dim + hf_config.qk_nope_head_dim
base_size = ((d_model * (hf_config.kv_lora_rank + hf_config.qk_rope_head_dim)) +
(hf_config.kv_lora_rank * n_attn_heads *
(q_head_dim - hf_config.qk_rope_head_dim + hf_config.v_head_dim)) +
(hf_config.v_head_dim * n_attn_heads * d_model))
if hasattr(hf_config, "q_lora_rank") and hf_config.q_lora_rank:
q_size = (d_model * hf_config.q_lora_rank +
hf_config.q_lora_rank * n_attn_heads * q_head_dim)
else:
q_size = d_model * n_attn_heads * q_head_dim
return (base_size + q_size) / 1e12
def _calculate_expert_config(model_name, hf_config, d_ff, d_model, n_layers):
"""Calculate expert configuration based on model type"""
config = {
'expert_size': d_ff * 3 * d_model / 1e12,
'shared_experts_size_total': 0,
'deepseek_dense_ffn_size': 0,
'deepseek_sparse_layer_num': 0,
'deepseek_num_dense_layer': 0
}
if "Qwen" in model_name and not "Qwen3" in model_name:
config.update(_get_qwen_expert_config(hf_config, d_model))
elif "Qwen3" in model_name:
config.update(_get_qwen3_expert_config(hf_config, d_model))
elif "DeepSeek" in model_name:
config.update(_get_deepseek_expert_config(hf_config, d_model, n_layers))
return config
def _get_qwen_expert_config(hf_config, d_model):
"""Get Qwen-specific expert configuration"""
if (hasattr(hf_config, "moe_intermediate_size") and
hasattr(hf_config, "shared_expert_intermediate_size")):
return {
'expert_size': hf_config.moe_intermediate_size * 3 * d_model / 1e12,
'shared_experts_size_total': hf_config.shared_expert_intermediate_size * 3 * d_model / 1e12
}
return {}
def _get_qwen3_expert_config(hf_config, d_model):
"""Get Qwen3-specific expert configuration"""
if hasattr(hf_config, "moe_intermediate_size"):
return {
'expert_size': hf_config.moe_intermediate_size * 3 * d_model / 1e12
}
return {}
def _get_deepseek_expert_config(hf_config, d_model, n_layers):
"""Get DeepSeek-specific expert configuration"""
if (hasattr(hf_config, "moe_intermediate_size") and
hasattr(hf_config, "intermediate_size") and
hasattr(hf_config, "first_k_dense_replace")):
deepseek_num_dense_layer = hf_config.first_k_dense_replace
return {
'expert_size': hf_config.moe_intermediate_size * 3 * d_model / 1e12,
'shared_experts_size_total': hf_config.moe_intermediate_size * 3 * d_model * 2 / 1e12,
'deepseek_dense_ffn_size': hf_config.intermediate_size * 3 * d_model / 1e12,
'deepseek_sparse_layer_num': n_layers - deepseek_num_dense_layer,
'deepseek_num_dense_layer': deepseek_num_dense_layer
}
return {}
def _process_outputs(output_data, per_token_kv_size, attention_size_per_token,
model_name, hf_config, n_layers, n_attn_heads, d_head):
"""Process outputs to calculate KV sizes and attention scores"""
kvs = []
true_kvs = []
attn_scores = []
for prefill_len, output_len in zip(output_data['prefill_lengths'], output_data['output_lengths']):
# Calculate attention score
attn_score = _calculate_attention_score(model_name, hf_config, prefill_len, output_len,
n_layers, n_attn_heads, d_head)
attn_scores.append(attn_score)
# Calculate KV sizes
kv_size = (prefill_len * per_token_kv_size + (output_len - 1) * per_token_kv_size / 2) / 1e12
true_kv = (prefill_len * per_token_kv_size + output_len * per_token_kv_size) / 1e9
kvs.append(kv_size)
true_kvs.append(true_kv)
return {
'kv_size': sum(kvs),
'true_kv_size': sum(true_kvs) * 1e3,
'attention_score': sum(attn_scores) / len(attn_scores)
}
def _calculate_attention_score(model_name, hf_config, prefill_len, output_len,
n_layers, n_attn_heads, d_head):
"""Calculate attention score for a single output"""
if ("DeepSeek" in model_name and
hasattr(hf_config, "qk_rope_head_dim") and
hasattr(hf_config, "qk_nope_head_dim") and
hasattr(hf_config, "v_head_dim")):
q_head_dim = hf_config.qk_rope_head_dim + hf_config.qk_nope_head_dim
k_size = n_layers * n_attn_heads * q_head_dim
v_size = n_layers * n_attn_heads * hf_config.v_head_dim
score = (prefill_len * k_size + (output_len - 1) * k_size / 2 +
prefill_len * v_size + (output_len - 1) * v_size / 2)
else:
kv_size = n_layers * n_attn_heads * d_head
score = (prefill_len * kv_size + (output_len - 1) * kv_size / 2) * 2
return score / 1e12
def _calculate_throughput_metrics(batch_size, prefill_lengths, max_duration):
"""Calculate throughput metrics"""
total_prefill = sum(prefill_lengths)
prefill_tp = total_prefill / (max_duration)
ttft = max_duration / batch_size
return ttft, prefill_tp
def _calculate_smbu_smfu(model_name, n_layers, attention_size_per_token, expert_config,
avg_activated_experts, metrics_data, hardware_specs, num_gpus,
precision, ttft, prefill_tp, batch_size, decoding_tp):
"""Calculate S-MBU and S-MFU metrics"""
prefill_activation = avg_activated_experts[1]
decode_steps_activation = avg_activated_experts[2:]
# Calculate prefill metrics
prefill_smbu, prefill_smfu = _calculate_prefill_metrics(
model_name, n_layers, attention_size_per_token, expert_config,
prefill_activation, metrics_data['attention_score'], hardware_specs,
num_gpus, precision, ttft, prefill_tp
)
# Calculate decoding metrics
decoding_smbu, decoding_smfu = _calculate_decoding_metrics(
model_name, n_layers, attention_size_per_token, expert_config,
decode_steps_activation, metrics_data, hardware_specs,
num_gpus, precision, batch_size, decoding_tp
)
return {
'prefill_smbu': prefill_smbu,
'prefill_smfu': prefill_smfu,
'decoding_smbu': decoding_smbu,
'decoding_smfu': decoding_smfu
}
def _calculate_prefill_metrics(model_name, n_layers, attention_size_per_token, expert_config,
prefill_activation, attention_score, hardware_specs,
num_gpus, precision, ttft, prefill_tp):
"""Calculate prefill S-MBU and S-MFU"""
model_calculators = {
'Qwen': _calculate_qwen_prefill,
'Qwen3': _calculate_qwen3_prefill,
'DeepSeek': _calculate_deepseek_prefill
}
for model_type, calculator in model_calculators.items():
if model_type in model_name and (model_type != 'Qwen' or 'Qwen3' not in model_name):
return calculator(n_layers, attention_size_per_token, expert_config,
prefill_activation, attention_score, hardware_specs,
num_gpus, precision, ttft, prefill_tp)
# Default case
return _calculate_default_prefill(n_layers, attention_size_per_token, expert_config,
prefill_activation, attention_score, hardware_specs,
num_gpus, precision, ttft, prefill_tp)
def _calculate_decoding_metrics(model_name, n_layers, attention_size_per_token, expert_config,
decode_steps_activation, metrics_data, hardware_specs,
num_gpus, precision, batch_size, decoding_tp):
"""Calculate decoding S-MBU and S-MFU"""
decoding_smbus = []
for activation in decode_steps_activation:
if "Qwen" in model_name and "Qwen3" not in model_name:
smbu, smfu = _calculate_qwen_decoding(n_layers, attention_size_per_token, expert_config,
activation, metrics_data, hardware_specs, num_gpus,
precision, batch_size, decoding_tp)
elif "Qwen3" in model_name:
smbu, smfu = _calculate_qwen3_decoding(n_layers, attention_size_per_token, expert_config,
activation, metrics_data, hardware_specs, num_gpus,
precision, batch_size, decoding_tp)
elif "DeepSeek" in model_name:
smbu, smfu = _calculate_deepseek_decoding(n_layers, attention_size_per_token, expert_config,
activation, metrics_data, hardware_specs, num_gpus,
precision, batch_size, decoding_tp)
else:
smbu, smfu = _calculate_default_decoding(n_layers, attention_size_per_token, expert_config,
activation, metrics_data, hardware_specs, num_gpus,
precision, batch_size, decoding_tp)
decoding_smbus.append(smbu)
return sum(decoding_smbus) / len(decoding_smbus), smfu
# Helper functions for specific model calculations
def _calculate_qwen_prefill(n_layers, attention_size_per_token, expert_config, prefill_activation,
attention_score, hardware_specs, num_gpus, precision, ttft, prefill_tp):
smbu_numerator = (n_layers * (prefill_activation * expert_config['expert_size'] +
expert_config['shared_experts_size_total'] +
attention_size_per_token)) * precision / ttft
smbu = smbu_numerator / (num_gpus * hardware_specs['peak_bandwidth_tb'])
smfu_numerator = (n_layers * (attention_size_per_token + expert_config['expert_size'] +
expert_config['shared_experts_size_total']) + attention_score) * 2 * prefill_tp
smfu = smfu_numerator / (num_gpus * hardware_specs['peak_flops_tf'] / 2)
return smbu, smfu
def _calculate_qwen3_prefill(n_layers, attention_size_per_token, expert_config, prefill_activation,
attention_score, hardware_specs, num_gpus, precision, ttft, prefill_tp):
smbu_numerator = (n_layers * (prefill_activation * expert_config['expert_size'] +
attention_size_per_token)) * precision / ttft
smbu = smbu_numerator / (num_gpus * hardware_specs['peak_bandwidth_tb'])
smfu_numerator = (n_layers * (attention_size_per_token + expert_config['expert_size']) +
attention_score) * 2 * prefill_tp
smfu = smfu_numerator / (num_gpus * hardware_specs['peak_flops_tf'] / 2)
return smbu, smfu
def _calculate_deepseek_prefill(n_layers, attention_size_per_token, expert_config, prefill_activation,
attention_score, hardware_specs, num_gpus, precision, ttft, prefill_tp):
smbu_numerator = ((n_layers * attention_size_per_token +
expert_config['deepseek_sparse_layer_num'] *
(prefill_activation * expert_config['expert_size'] +
expert_config['shared_experts_size_total']) +
expert_config['deepseek_num_dense_layer'] *
expert_config['deepseek_dense_ffn_size']) * precision / ttft)
smbu = smbu_numerator / (num_gpus * hardware_specs['peak_bandwidth_tb'])
smfu_numerator = ((n_layers * attention_size_per_token +
expert_config['deepseek_sparse_layer_num'] *
(expert_config['expert_size'] + expert_config['shared_experts_size_total']) +
expert_config['deepseek_num_dense_layer'] *
expert_config['deepseek_dense_ffn_size'] + attention_score) * 2 * prefill_tp)
smfu = smfu_numerator / (num_gpus * hardware_specs['peak_flops_tf'] / 2)
return smbu, smfu
def _calculate_default_prefill(n_layers, attention_size_per_token, expert_config, prefill_activation,
attention_score, hardware_specs, num_gpus, precision, ttft, prefill_tp):
# Default implementation
smbu_numerator = (n_layers * (prefill_activation * expert_config['expert_size'] +
attention_size_per_token)) * precision / ttft
smbu = smbu_numerator / (num_gpus * hardware_specs['peak_bandwidth_tb'])
smfu_numerator = (n_layers * (attention_size_per_token + expert_config['expert_size']) +
attention_score) * 2 * prefill_tp
smfu = smfu_numerator / (num_gpus * hardware_specs['peak_flops_tf'] / 2)
return smbu, smfu
def _calculate_qwen_decoding(n_layers, attention_size_per_token, expert_config, activation,
metrics_data, hardware_specs, num_gpus, precision, batch_size, decoding_tp):
smbu_numerator = ((n_layers * (activation * expert_config['expert_size'] +
expert_config['shared_experts_size_total'] +
attention_size_per_token) +
metrics_data['kv_size']) * precision / (batch_size / decoding_tp))
smbu = smbu_numerator / (num_gpus * hardware_specs['peak_bandwidth_tb'])
smfu_numerator = ((n_layers * (attention_size_per_token + expert_config['expert_size'] +
expert_config['shared_experts_size_total']) +
metrics_data['attention_score']) * 2 * decoding_tp)
smfu = smfu_numerator / (num_gpus * hardware_specs['peak_flops_tf'] / 2)
return smbu, smfu
def _calculate_qwen3_decoding(n_layers, attention_size_per_token, expert_config, activation,
metrics_data, hardware_specs, num_gpus, precision, batch_size, decoding_tp):
smbu_numerator = ((n_layers * (activation * expert_config['expert_size'] +
attention_size_per_token) +
metrics_data['kv_size']) * precision / (batch_size / decoding_tp))
smbu = smbu_numerator / (num_gpus * hardware_specs['peak_bandwidth_tb'])
smfu_numerator = ((n_layers * (attention_size_per_token + expert_config['expert_size']) +
metrics_data['attention_score']) * 2 * decoding_tp)
smfu = smfu_numerator / (num_gpus * hardware_specs['peak_flops_tf'] / 2)
return smbu, smfu
def _calculate_deepseek_decoding(n_layers, attention_size_per_token, expert_config, activation,
metrics_data, hardware_specs, num_gpus, precision, batch_size, decoding_tp):
smbu_numerator = ((n_layers * attention_size_per_token +
expert_config['deepseek_sparse_layer_num'] *
(activation * expert_config['expert_size'] +
expert_config['shared_experts_size_total']) +
expert_config['deepseek_num_dense_layer'] *
expert_config['deepseek_dense_ffn_size'] +
metrics_data['kv_size']) * precision / (batch_size / decoding_tp))
smbu = smbu_numerator / (num_gpus * hardware_specs['peak_bandwidth_tb'])
smfu_numerator = ((n_layers * attention_size_per_token +
expert_config['deepseek_sparse_layer_num'] *
(expert_config['expert_size'] + expert_config['shared_experts_size_total']) +
expert_config['deepseek_num_dense_layer'] *
expert_config['deepseek_dense_ffn_size'] +
metrics_data['attention_score']) * 2 * decoding_tp)
smfu = smfu_numerator / (num_gpus * hardware_specs['peak_flops_tf'] / 2)
return smbu, smfu
def _calculate_default_decoding(n_layers, attention_size_per_token, expert_config, activation,
metrics_data, hardware_specs, num_gpus, precision, batch_size, decoding_tp):
smbu_numerator = ((n_layers * (activation * expert_config['expert_size'] +
attention_size_per_token) +
metrics_data['kv_size']) * precision / (batch_size / decoding_tp))
smbu = smbu_numerator / (num_gpus * hardware_specs['peak_bandwidth_tb'])
smfu_numerator = ((n_layers * (attention_size_per_token + expert_config['expert_size']) +
metrics_data['attention_score']) * 2 * decoding_tp)
smfu = smfu_numerator / (num_gpus * hardware_specs['peak_flops_tf'] / 2)
return smbu, smfu
def _calculate_batch_metrics_hflm(output_len, context_prefill_size, decoding_tp, n_layers, d_model,
n_attn_heads, d_head, n_kv_heads, n_experts_per_tok, d_ff,
avg_activated_experts, hf_config, num_gpus, model_name,
used_dtype, batch_size, precision):
"""Calculate metrics for a batch of outputs"""
gpu_type = get_gpu_details()
hardware_specs = {
"peak_bandwidth_tb": get_peak_bw(gpu_type) / 1e12,
"peak_flops_tf": get_peak_flops(gpu_type, precision=used_dtype) / 1e12,
}
# Calculate KV sizes
per_token_kv_size = 2 * n_layers * d_head * n_kv_heads # Default calculation
if "DeepSeek" in model_name:
if hasattr(hf_config, "kv_lora_rank") and hasattr(hf_config, "qk_rope_head_dim"):
per_token_kv_size = n_layers * (hf_config.kv_lora_rank + hf_config.qk_rope_head_dim)
# Calculate attention scores
if "DeepSeek" in model_name and hasattr(hf_config, "qk_rope_head_dim") and hasattr(hf_config, "qk_nope_head_dim") and hasattr(hf_config, "v_head_dim"):
q_head_dim = hf_config.qk_rope_head_dim + hf_config.qk_nope_head_dim
origin_per_token_k_state_size = n_layers * n_attn_heads * q_head_dim
origin_per_token_v_state_size = n_layers * n_attn_heads * hf_config.v_head_dim
attention_score = context_prefill_size * origin_per_token_k_state_size + (output_len - 1) * origin_per_token_k_state_size / 2
attention_score += context_prefill_size * origin_per_token_v_state_size + (output_len - 1) * origin_per_token_v_state_size / 2
attention_score = attention_score / 1e12
else:
origin_per_token_kv_states_size = n_layers * n_attn_heads * d_head
attention_score = context_prefill_size * origin_per_token_kv_states_size + (output_len - 1) * origin_per_token_kv_states_size / 2
attention_score = attention_score * 2 / 1e12
# Store attention scores and KV sizes
kv_size = context_prefill_size * per_token_kv_size + (output_len - 1) * per_token_kv_size / 2
kv_size = kv_size / 1e12
true_kv = (context_prefill_size * per_token_kv_size + output_len * per_token_kv_size) / 1e12 * 1e3
# Calculate aggregate values
kv_size = kv_size * batch_size
true_kv_size = true_kv * batch_size * 1e3
# Calculate attention size per token
if "DeepSeek" in model_name and hasattr(hf_config, "qk_rope_head_dim") and hasattr(hf_config, "qk_nope_head_dim") and hasattr(hf_config, "v_head_dim") and hasattr(hf_config, "kv_lora_rank"):
q_head_dim = hf_config.qk_rope_head_dim + hf_config.qk_nope_head_dim
if not hasattr(hf_config, "q_lora_rank") or not hf_config.q_lora_rank:
attention_size_per_token = (d_model * n_attn_heads * q_head_dim) + \
(d_model * (hf_config.kv_lora_rank + hf_config.qk_rope_head_dim)) + \
(hf_config.kv_lora_rank * n_attn_heads * (q_head_dim - hf_config.qk_rope_head_dim + hf_config.v_head_dim)) + \
(hf_config.v_head_dim * n_attn_heads * d_model)
attention_size_per_token = attention_size_per_token / 1e12
else:
attention_size_per_token = (d_model * hf_config.q_lora_rank) + \
(hf_config.q_lora_rank * n_attn_heads * q_head_dim) + \
(d_model * (hf_config.kv_lora_rank + hf_config.qk_rope_head_dim)) + \
(hf_config.kv_lora_rank * n_attn_heads * (q_head_dim - hf_config.qk_rope_head_dim + hf_config.v_head_dim)) + \
(hf_config.v_head_dim * n_attn_heads * d_model)
attention_size_per_token = attention_size_per_token / 1e12
else:
attention_size_per_token = d_model * (n_attn_heads * d_head + n_kv_heads * d_head * 2) + n_attn_heads * d_head * d_model
attention_size_per_token = attention_size_per_token / 1e12
# Calculate expert sizes
expert_size = d_ff * 3 * d_model / 1e12
shared_experts_size_total = 0
deepseek_dense_ffn_size = 0
deepseek_sparse_layer_num = 0
if "Qwen" in model_name and hasattr(hf_config, "moe_intermediate_size") and hasattr(hf_config, "shared_expert_intermediate_size"):
d_ff = hf_config.moe_intermediate_size
d_ff_share = hf_config.shared_expert_intermediate_size
shared_experts_size = d_ff_share * 3 * d_model
expert_size = d_ff * 3 * d_model
shared_experts_size_total = shared_experts_size / 1e12
expert_size = expert_size / 1e12
elif "Qwen3" in model_name and hasattr(hf_config, "moe_intermediate_size"):
d_ff = hf_config.moe_intermediate_size
expert_size = d_ff * 3 * d_model
expert_size = expert_size / 1e12
elif "DeepSeek" in model_name and hasattr(hf_config, "moe_intermediate_size") and hasattr(hf_config, "intermediate_size") and hasattr(hf_config, "first_k_dense_replace"):
d_ff = hf_config.moe_intermediate_size
d_ff_dense = hf_config.intermediate_size
deepseek_num_dense_layer = hf_config.first_k_dense_replace
shared_experts_size = d_ff * 3 * d_model
expert_size = d_ff * 3 * d_model
shared_experts = 2
shared_experts_size_total = shared_experts_size * shared_experts / 1e12
expert_size = expert_size / 1e12
deepseek_sparse_layer_num = n_layers - deepseek_num_dense_layer
deepseek_dense_ffn_size = d_ff_dense * 3 * d_model / 1e12
# Calculate S-MBU and S-MFU
if "Qwen" in model_name:
smbu = ((n_layers*(avg_activated_experts * expert_size + shared_experts_size_total + attention_size_per_token) +
kv_size) * precision/(batch_size / decoding_tp) ) / (num_gpus * hardware_specs['peak_bandwidth_tb'])
smfu = (n_layers * (attention_size_per_token + n_experts_per_tok * expert_size + shared_experts_size_total) + attention_score) \
* 2 * decoding_tp / (num_gpus * hardware_specs['peak_flops_tf'] / 2)
elif "Qwen3" in model_name:
smbu = ((n_layers * (avg_activated_experts * expert_size + attention_size_per_token) +
kv_size) * precision/(batch_size / decoding_tp) ) / (num_gpus * hardware_specs['peak_bandwidth_tb'])
smfu = (n_layers * (attention_size_per_token + n_experts_per_tok * expert_size) + attention_score) \
* 2 * decoding_tp / (num_gpus * hardware_specs['peak_flops_tf'] / 2)
elif "DeepSeek" in model_name:
smbu = ((n_layers * attention_size_per_token + deepseek_sparse_layer_num * \
(avg_activated_experts * expert_size + shared_experts_size_total) + \
deepseek_num_dense_layer * deepseek_dense_ffn_size + \
kv_size) * precision/(batch_size / decoding_tp) ) / (num_gpus * hardware_specs['peak_bandwidth_tb'])
smfu = (n_layers * attention_size_per_token + deepseek_sparse_layer_num * \
(n_experts_per_tok * expert_size + shared_experts_size_total) + \
deepseek_num_dense_layer * deepseek_dense_ffn_size + attention_score) \
* 2 * decoding_tp / (num_gpus * hardware_specs['peak_flops_tf'] / 2)
else:
smbu = ((n_layers*(avg_activated_experts * expert_size + attention_size_per_token) +
kv_size) * precision/(batch_size / decoding_tp) ) / (num_gpus * hardware_specs['peak_bandwidth_tb'])
smfu = (n_layers * (attention_size_per_token + n_experts_per_tok * expert_size) + attention_score) \
* 2 * decoding_tp / (num_gpus * hardware_specs['peak_flops_tf'] / 2)
return {
'smbu': smbu,
'smfu': smfu,
'kv_size': true_kv_size,
'decoding_throughput': decoding_tp,
'ttft': 0
}
class ModelInfoRetriever:
def __init__(self, model_name: str, precision: str = 'float16'):
if precision not in ['float32', 'float16', 'bfloat16', 'int8', 'int4', 'awq', 'gptq', 'fp8', 'fp4']:
raise ValueError("Precision must be one of ['float32', 'float16', 'bfloat16', 'int8', 'int4', 'awq', 'gptq', 'fp8', 'fp4']")
self.model_name = model_name
self.precision = precision
self.config = AutoConfig.from_pretrained(model_name, trust_remote_code=True)
self.model_type = self.config.model_type
def get_model_precision_bits(self):
"""Returns bit width used by the given quantization format."""
if self.precision == 'float32':
return 4
if self.precision in ['float16', 'bfloat16']:
return 2
if self.precision in ['int8', 'fp8']:
return 1
if self.precision in ['int4', 'fp4', 'gptq', 'awq']:
return 0.5
raise ValueError(f"Unsupported precision: {self.precision}")
def get_attention_info(self):
"""Returns attention-related info"""
return {
'num_attention_heads': getattr(self.config, "num_attention_heads", None),
'num_key_value_heads': getattr(self.config, "num_key_value_heads", getattr(self.config, "num_kv_heads", None)),
'head_dim': getattr(self.config, "head_dim", getattr(self.config, "hidden_size", None) // getattr(self.config, "num_attention_heads", 1))
}
def get_rope_info(self):
"""Returns RoPE (rotary embedding) info if available"""
if hasattr(self.config, "rope_scaling"):
return {
"type": self.config.rope_scaling.get("type"),
"factor": self.config.rope_scaling.get("factor")
}
elif hasattr(self.config, "use_alibi"):
return {"type": "alibi", "enabled": self.config.use_alibi}
else:
return {"type": "none"}
def get_moe_info(self, d_model=None):
"""Returns MoE configuration such as number of experts and FFN dim"""
if d_model is None:
d_model = getattr(self.config, "hidden_size", None)
num_experts = (
getattr(self.config, "num_local_experts", None) or
getattr(self.config, "num_experts", None) or
getattr(self.config, "n_routed_experts", None) or
getattr(getattr(self.config, "ffn_config", {}), "moe_num_experts", None) or
1
)
n_experts_per_tok = (
getattr(self.config, "num_experts_per_tok", None) or
getattr(self.config, "num_selected_experts", None) or
getattr(getattr(self.config, "ffn_config", {}), "moe_top_k", None) or
1
)
d_ff = (
getattr(self.config, "ffn_dim", None) or
getattr(self.config, "intermediate_size", None) or
getattr(self.config, "d_ff", None) or
(d_model * getattr(self.config, "ff_ratio", 4)) or
getattr(getattr(self.config, "ffn_config", {}), "ffn_hidden_size", None) or
(4 * d_model)
)
return {
"num_experts": num_experts,
"experts_per_token": n_experts_per_tok,
"ffn_dim": d_ff
}
def get_architecture_info(self):
"""Returns model-wide architecture info"""
return {
"model_type": self.model_type,
"hidden_size": getattr(self.config, "hidden_size", None),
"num_hidden_layers": getattr(self.config, "num_hidden_layers", None),
"max_position_embeddings": getattr(self.config, "max_position_embeddings", None),
"vocab_size": getattr(self.config, "vocab_size", None),
"architectures": getattr(self.config, "architectures", []),
}
def summarize(self):
"""Aggregate all extracted info in a dictionary"""
d_model = getattr(self.config, "hidden_size", None)
return {
"model_name": self.model_name,
"model_type": self.model_type,
"precision_bits": self.get_model_precision_bits(),
"architecture": self.get_architecture_info(),
"attention": self.get_attention_info(),
"rope": self.get_rope_info(),
"moe": self.get_moe_info(d_model)
}
# if __name__ == "__main__":
# print(analyze_gpu_stats(parse_nvidia_smi()))
# print(get_gpu_details())