Files
enginex-mlu590-vllm/vllm_mlu/mlu_metric.py
2026-04-24 09:58:03 +08:00

413 lines
21 KiB
Python

# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM-MLU project
import torch
import time
import statistics
import pandas as pd
import numpy as np
import json
import os
from datetime import datetime
from vllm.logger import init_logger
from vllm_mlu._mlu_utils import VLLM_LATENCY_DEBUG_WITH_DEVICE_EN, VLLM_DUMP_MLU_INFO_EN
from vllm.model_executor.layers.quantization import get_quantization_config
logger = init_logger(__name__)
millisecond2second_unit = 1000
class LLMMetric:
def __init__(self)->None:
self.batch_size_list = []
self.context_latency_list = []
self.e2e_latency_list = []
self.per_token_latency_list = [ [] ]
self.per_token_latency_device_list = [ [] ]
self.mm_encoder_latency_device_list = [ [] ]
self.peak_memory = 0
self.block_memory = 0
self.num_total_gpu_blocks = 0
self.num_total_cpu_blocks = 0
self.num_free_gpu_blocks_list = [ [] ]
self.num_free_cpu_blocks_list = [ [] ]
self.num_spec_tokens = 0
self.draft_acceptance_rate = 0.0
self.context_latency_device = 0.0
self.generate_latency_device = 0.0
self.mm_encoder_latency_device = 0.0
def reset_metric(self):
self.batch_size_list = []
self.context_latency_list = []
self.e2e_latency_list = []
self.per_token_latency_list = [ [] ]
self.per_token_latency_device_list = [ [] ]
self.mm_encoder_latency_device_list = [ [] ]
self.num_free_gpu_blocks_list = [ [] ]
self.num_free_cpu_blocks_list = [ [] ]
self.num_spec_tokens = 0
self.draft_acceptance_rate = 0.0
@classmethod
def get_mlu_cost_time(cls):
torch.mlu.synchronize()
return time.perf_counter()
def is_prefill_stage(self):
return len(self.per_token_latency_list[-1]) == 0
def update_memory_usage(self, peak_memory, block_memory, num_total_gpu_blocks, num_total_cpu_blocks):
self.peak_memory = peak_memory
self.block_memory = block_memory
self.num_total_gpu_blocks = num_total_gpu_blocks
self.num_total_cpu_blocks = num_total_cpu_blocks
def update_step_block_usage(self, num_free_gpu_blocks, num_free_cpu_blocks):
self.num_free_gpu_blocks_list[-1].append(num_free_gpu_blocks)
self.num_free_cpu_blocks_list[-1].append(num_free_cpu_blocks)
def update_step_latency(self, step_latency):
if isinstance(step_latency, list):
self.per_token_latency_list[-1].extend(step_latency)
else:
self.per_token_latency_list[-1].append(step_latency)
def update_step_latency_device(self, step_latency):
if isinstance(step_latency, list):
self.per_token_latency_device_list[-1].extend(step_latency)
else:
self.per_token_latency_device_list[-1].append(step_latency)
def update_mm_encoder_latency_device(self, step_latency):
if isinstance(step_latency, list):
if len(step_latency) == 0:
return
assert len(step_latency) == 1, f"Not supported! Model with multi mm encoder steps. {len(step_latency)} {step_latency}"
self.mm_encoder_latency_device_list[-1].extend(step_latency)
else:
self.mm_encoder_latency_device_list[-1].append(step_latency)
def update_spec_decode_metrics(self, spec_decode_metrics):
self.num_spec_tokens = spec_decode_metrics.num_spec_tokens
self.draft_acceptance_rate = spec_decode_metrics.draft_acceptance_rate
def add_metrics(self, batch_size, e2e_latency)->None:
self.batch_size_list.append(batch_size)
self.e2e_latency_list.append(e2e_latency)
self.per_token_latency_list.append([]) # new iter
self.per_token_latency_device_list.append([])
self.mm_encoder_latency_device_list.append([])
self.num_free_gpu_blocks_list.append([])
self.num_free_cpu_blocks_list.append([])
def get_weight_dtype_str(self, model_path, model_dtype, quantization) -> str:
# get weight dtype based on quantization config if exists
if quantization == 'fp8':
return quantization
if quantization is not None:
quant_method = get_quantization_config(quantization)
# combine the model path with the quantization config file name
quant_config_paths = quant_method.get_config_filenames()
# if there are multiple quantization config files, return the first one existed
for quant_config_path in quant_config_paths:
quant_config_path = os.path.join(model_path, quant_config_path)
# check if the quantization config file exists
if not os.path.exists(quant_config_path):
continue
with open(quant_config_path, 'r') as f:
quant_config = json.load(f)
quant_config = quant_method.from_config(quant_config)
# for smoothquant and weightonly, return the quantization name with the weight bits
if quantization == "smoothquant" or quantization == ["weightonly"]:
return "{}-int{}".format(quant_config.get_name(), quant_config.weight_bits)
else:
# for other quantization methods, return the quantization name
return quant_config.get_name()
# if the quantization config file does not exist, just return the quanization name
return quant_config_path.get_name()
else:
# remove the prefix of model dtype from torch config
return str(model_dtype).split(".")[-1]
def to_csv(self, filename: str, show_per_iter=False) -> None:
if show_per_iter:
df = pd.DataFrame(self.metrics_data)
df = pd.DataFrame([df.iloc[-1]], columns=df.columns)
memory_df = pd.DataFrame(self.memory_metrics_data)
memory_df = pd.DataFrame([memory_df.iloc[-1]], columns=memory_df.columns)
else:
df = pd.DataFrame(self.metrics_data)
memory_df = pd.DataFrame(self.memory_metrics_data)
df_mean = df.mean().round(3)
memory_df_mean = memory_df.mean().round(3)
header = ["datetime", "model",
"weight dtype", self.batch_size_name,
]
header = header + list(self.mm_kwargs.keys())
header = header + ["input len", "output len", "tp",
self.context_latency_name, self.per_token_latency_name]
data = [datetime.now().strftime("%Y-%m-%d %H:%M:%S"), self.model,
self.weight_dtype_str, int(self.metrics_data[self.batch_size_name][0])]
data = data + [self.mm_kwargs[k] for k in self.mm_kwargs.keys()]
data = data + [self.input_len, self.output_len, self.tp,
df_mean[self.context_latency_name], df_mean[self.per_token_latency_name]]
if self.num_spec_tokens > 0:
header += [self.per_step_latency_name]
data += [df_mean[self.per_step_latency_name]]
if VLLM_LATENCY_DEBUG_WITH_DEVICE_EN:
if self.is_v1_multimodal:
header += [self.mm_encoder_latency_device_name,]
data += [df_mean[self.mm_encoder_latency_device_name],]
header += [self.context_latency_device_name, self.per_token_latency_device_name]
data += [df_mean[self.context_latency_device_name], df_mean[self.per_token_latency_device_name]]
header += [self.e2e_latency_name, self.e2e_throughput_name, self.decoder_throughput_name,]
if self.num_spec_tokens > 0:
header += [self.k_name, self.acceptance_rate_name]
header += [self.decode_times_name,
self.peak_memory_name, self.block_memory_name]
data += [
df_mean[self.e2e_latency_name], df_mean[self.e2e_throughput_name], df_mean[self.decoder_throughput_name],
]
if self.num_spec_tokens > 0:
data += [self.num_spec_tokens, df_mean[self.acceptance_rate_name],]
data += [df_mean[self.decode_times_name], memory_df_mean[self.peak_memory_name], memory_df_mean[self.block_memory_name],]
if VLLM_LATENCY_DEBUG_WITH_DEVICE_EN and self.save_hfu_info:
header += [self.context_hfu_name, self.decoder_hfu_name, self.decoder_io_efficiency_name]
data += [
df_mean[self.context_hfu_name], df_mean[self.decoder_hfu_name],
df_mean[self.decoder_io_efficiency_name]
]
data_dict = dict(zip(header, data))
df_csv = pd.DataFrame(data_dict, index=[0])
append = False
if os.path.isfile(filename):
try:
df_old = pd.read_csv(filename)
append = (df_old.columns.tolist() == header)
except Exception as e:
logger.info(f"Existing {filename} failed to be read and will be overwritten")
if append:
df_csv.to_csv(filename, mode='a', header=False, index=False)
logger.info(f"Metric appended to existing {filename}")
else:
df_csv.to_csv(filename, index=False)
logger.info(f"Metric written to {filename}")
def calc_metric(self, model, model_dtype, metrics_idx_start, only_average,
input_len, output_len, tp_nums, quantization,
show_per_iter=False, is_embedding_task=False, mm_kwargs=None,
total_prefill_steps=1, num_spec_tokens=0, dp_size=1, hfu_info=None, io_efficiency=0.0) -> None:
keep_digits = 2
def round_fn(data):
return round(data, keep_digits)
metrics_idx_end = len(self.per_token_latency_list) - 1 # without last []
idx_range = range(metrics_idx_start, metrics_idx_end)
# specify entries to write to csv
self.is_v1_multimodal = mm_kwargs
self.mm_kwargs = mm_kwargs if mm_kwargs else {} # multimodal args
self.batch_size_name = "batch size"
self.input_len = input_len
self.output_len = output_len
self.tp = tp_nums
self.dp = dp_size
self.model = model
self.context_latency_name = "context latency(ms)"
self.mm_encoder_latency_device_name = "multimodal encoder latency device(ms)"
self.context_latency_device_name = "context latency device(ms)"
if num_spec_tokens > 0:
self.per_step_latency_name = "per step latency(ms)"
self.per_token_latency_device_name = "per step latency device(ms)"
else:
self.per_token_latency_device_name = "per token latency device(ms)"
self.per_token_latency_name = "per token latency(ms)"
self.e2e_latency_name = "e2e latency(ms)"
self.e2e_throughput_name = "e2e throughput(tokens/s)"
self.decoder_throughput_name = "decoder throughput(tokens/s)"
self.k_name = "K"
self.acceptance_rate_name = "acceptance rate"
self.decode_times_name = "decode times"
self.weight_dtype_str = self.get_weight_dtype_str(model, model_dtype, quantization)
self.num_spec_tokens = num_spec_tokens
rate_list=[]
rate=0
if num_spec_tokens > 0:
for i in range(metrics_idx_end):
if len(self.per_token_latency_list[i]) - total_prefill_steps == 0:
logger.warning("For now output_len is 0, no need mtp info, if you need mtp info, please increase output_len.")
rate_list.append(0.0)
else:
rate_list.append(((self.output_len - 1) / (float)(len(self.per_token_latency_list[i]) - total_prefill_steps) - 1) / num_spec_tokens)
rate = statistics.fmean(rate_list[metrics_idx_start: metrics_idx_end])
metrics_data = [
(
self.batch_size_name, [self.dp * int(self.batch_size_list[i]) for i in idx_range]
),
(
self.context_latency_name, [round_fn(millisecond2second_unit * sum(self.per_token_latency_list[i][:total_prefill_steps])) for i in idx_range]
),
(
self.per_token_latency_name, [
0.0 if len(self.per_token_latency_list[i]) <= total_prefill_steps else \
round_fn(statistics.fmean(self.per_token_latency_list[i][total_prefill_steps:]) * (len(self.per_token_latency_list[i]) - total_prefill_steps) / (self.output_len - 1) * millisecond2second_unit) for i in idx_range
]
),
]
if num_spec_tokens > 0:
metrics_data += [(self.per_step_latency_name, [
0.0 if len(self.per_token_latency_list[i]) <= total_prefill_steps else \
round_fn(statistics.fmean(self.per_token_latency_list[i][total_prefill_steps:]) * millisecond2second_unit) for i in idx_range
])]
metrics_data += [
(
self.e2e_latency_name, [round_fn(millisecond2second_unit * self.e2e_latency_list[i]) for i in idx_range]
),
(
self.e2e_throughput_name, [
round_fn(self.dp * (output_len / self.e2e_latency_list[i]) * self.batch_size_list[i]) \
for i in idx_range
]
),
(
self.decoder_throughput_name, [
0.0 if len(self.per_token_latency_list[i]) <= total_prefill_steps else \
round_fn(self.dp * ((output_len-1) / sum(self.per_token_latency_list[i][total_prefill_steps:])) * self.batch_size_list[i]) \
for i in idx_range
]
),
(
self.decode_times_name, [
0 if len(self.per_token_latency_list[i]) <= total_prefill_steps else \
len(self.per_token_latency_list[i][total_prefill_steps:]) for i in idx_range
]
),
]
if num_spec_tokens > 0:
metrics_data.append((self.k_name, num_spec_tokens))
metrics_data.append((self.acceptance_rate_name, [rate_list[i] for i in idx_range]))
insert_latency_device = VLLM_LATENCY_DEBUG_WITH_DEVICE_EN
if insert_latency_device:
device_item_idx = 3
if self.is_v1_multimodal:
mm_encoder_latency_device = [round_fn(sum(self.mm_encoder_latency_device_list[i])) for i in idx_range]
metrics_data.insert(device_item_idx, (self.mm_encoder_latency_device_name, mm_encoder_latency_device))
device_item_idx = device_item_idx + 1
context_latency_device = [round_fn(sum(self.per_token_latency_device_list[i][:total_prefill_steps])) for i in idx_range]
per_token_latency_device = [0.0 if len(self.per_token_latency_device_list[i]) <= total_prefill_steps else \
round_fn(statistics.fmean(self.per_token_latency_device_list[i][total_prefill_steps:])) for i in idx_range]
metrics_data.insert(device_item_idx, (self.context_latency_device_name, context_latency_device))
metrics_data.insert(device_item_idx + 1, (self.per_token_latency_device_name, per_token_latency_device))
self.metrics_data = dict(metrics_data)
# Print
df = pd.DataFrame(self.metrics_data)
if show_per_iter:
df = pd.DataFrame([df.iloc[-1]], columns=df.columns)
else:
df.loc["Average(" + str(metrics_idx_end-metrics_idx_start) + "iters)"] = df.mean().round(keep_digits)
if only_average:
df = pd.DataFrame([df.iloc[-1]], columns=df.columns)
df.index.name = 'iter index'
df[self.batch_size_name] = df[self.batch_size_name].astype(int)
if num_spec_tokens > 0:
df[self.k_name] = df[self.k_name].astype(int)
self.peak_memory_name = "profile memory(GB)"
self.block_memory_name = "total cache memory(GB)"
memory_metrics_data = [
(
self.peak_memory_name, [round_fn(self.peak_memory / 1024 / 1024 / 1024) for i in idx_range]
),
(
self.block_memory_name, [round_fn(self.block_memory / 1024 / 1024 / 1024) for i in idx_range]
),
]
self.memory_metrics_data = dict(memory_metrics_data)
# Print
memory_df = pd.DataFrame(self.memory_metrics_data)
if show_per_iter:
memory_df = pd.DataFrame([memory_df.iloc[-1]], columns=memory_df.columns)
else:
memory_df.loc["Average(" + str(metrics_idx_end-metrics_idx_start) + "iters)"] = memory_df.mean().round(keep_digits)
if only_average:
memory_df = pd.DataFrame([memory_df.iloc[-1]], columns=memory_df.columns)
memory_df.index.name = 'iter index'
pd.set_option('display.colheader_justify', 'center')
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
print("********************************* Test Info****************************")
mm_params_text = " ".join(f"{key}:{value}" for key, value in self.mm_kwargs.items())
print("Generation Config {} input len:{} output len:{} tp_nums:{} quantization:{}".format(
mm_params_text, input_len,output_len,tp_nums,quantization))
self.context_latency_device = np.mean(self.metrics_data['context latency device(ms)'])
self.generate_latency_device = np.mean(self.metrics_data[self.per_token_latency_device_name])
if self.is_v1_multimodal:
self.mm_encoder_latency_device = np.mean(self.metrics_data[self.mm_encoder_latency_device_name])
print("*************************Performance Info******************************")
print(f"Total prefill steps: {total_prefill_steps}")
print(df.to_string())
if not is_embedding_task:
# embedding task does not do profile run, so does not have memory infos
print(memory_df.to_string())
if insert_latency_device :
context_latency = np.mean(self.metrics_data['context latency device(ms)'])
generate_latency = np.mean(self.metrics_data[self.per_token_latency_device_name])
if num_spec_tokens > 0:
print("MTP token accept rate: {:.2f}%".format(rate*100))
self.dump_performance_info(hfu_info, io_efficiency)
avg_latency_e2e = sum(sum(self.per_token_latency_list[i]) for i in idx_range) / len(idx_range)
print("Avg latency without host time is :", avg_latency_e2e)
print("***********************************************************************")
# collect context_hfu and
self.save_hfu_info = False
if insert_latency_device:
if VLLM_DUMP_MLU_INFO_EN:
try:
import device_info
self.save_hfu_info = True
except:
logger.info(f"try import device_info failed. try pip install device_info.")
self.context_hfu_name = "Context HFU"
self.decoder_hfu_name = "Decoder HFU"
self.decoder_io_efficiency_name = "Decoder IO Efficiency"
if self.save_hfu_info:
self.metrics_data[self.context_hfu_name] = hfu_info["context_hfu"] * 100
self.metrics_data[self.decoder_hfu_name] = hfu_info["decoder_hfu"] * 100
self.metrics_data[self.decoder_io_efficiency_name] = io_efficiency * 100
if csv_path := os.getenv("OUTPUT_CSV_PATH"):
try:
if dir_path := os.path.dirname(csv_path):
os.makedirs(dir_path, exist_ok=True)
self.to_csv(csv_path, show_per_iter=show_per_iter)
except Exception as e:
logger.error(f"Invalid OUTPUT_CSV_PATH: {csv_path} to dump metrics, Error: {e}")
def dump_performance_info(self, hfu_info, io_efficiency):
try:
if VLLM_DUMP_MLU_INFO_EN and hfu_info != None:
hfu_info["context_hfu"] = hfu_info["context_hfu"] / (self.context_latency_device / millisecond2second_unit)
hfu_info["decoder_hfu"] = hfu_info["decoder_hfu"] / (self.generate_latency_device / millisecond2second_unit)
io_efficiency = io_efficiency / self.generate_latency_device
print(f"Context HFU-visible: {hfu_info['context_hfu']:.3%}")
print(f"Decoder HFU-visible: {hfu_info['decoder_hfu']:.3%}")
print(f"Decoder IO Efficiency: {io_efficiency:.3%}")
elif hfu_info != None:
print(f"Context FLOPS-visible: {hfu_info['context_flops']}")
print(f"Decoder FLOPS-visible: {hfu_info['decoder_flops']}")
else:
logger.info("Unsupport dump performance information")
except Exception as e:
logger.error(f"Failed to dump performance information: {str(e)}")