# SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM-MLU project import torch import time import statistics import pandas as pd import numpy as np import json import os from datetime import datetime from vllm.logger import init_logger from vllm_mlu._mlu_utils import VLLM_LATENCY_DEBUG_WITH_DEVICE_EN, VLLM_DUMP_MLU_INFO_EN from vllm.model_executor.layers.quantization import get_quantization_config logger = init_logger(__name__) millisecond2second_unit = 1000 class LLMMetric: def __init__(self)->None: self.batch_size_list = [] self.context_latency_list = [] self.e2e_latency_list = [] self.per_token_latency_list = [ [] ] self.per_token_latency_device_list = [ [] ] self.mm_encoder_latency_device_list = [ [] ] self.peak_memory = 0 self.block_memory = 0 self.num_total_gpu_blocks = 0 self.num_total_cpu_blocks = 0 self.num_free_gpu_blocks_list = [ [] ] self.num_free_cpu_blocks_list = [ [] ] self.num_spec_tokens = 0 self.draft_acceptance_rate = 0.0 self.context_latency_device = 0.0 self.generate_latency_device = 0.0 self.mm_encoder_latency_device = 0.0 def reset_metric(self): self.batch_size_list = [] self.context_latency_list = [] self.e2e_latency_list = [] self.per_token_latency_list = [ [] ] self.per_token_latency_device_list = [ [] ] self.mm_encoder_latency_device_list = [ [] ] self.num_free_gpu_blocks_list = [ [] ] self.num_free_cpu_blocks_list = [ [] ] self.num_spec_tokens = 0 self.draft_acceptance_rate = 0.0 @classmethod def get_mlu_cost_time(cls): torch.mlu.synchronize() return time.perf_counter() def is_prefill_stage(self): return len(self.per_token_latency_list[-1]) == 0 def update_memory_usage(self, peak_memory, block_memory, num_total_gpu_blocks, num_total_cpu_blocks): self.peak_memory = peak_memory self.block_memory = block_memory self.num_total_gpu_blocks = num_total_gpu_blocks self.num_total_cpu_blocks = num_total_cpu_blocks def update_step_block_usage(self, num_free_gpu_blocks, num_free_cpu_blocks): self.num_free_gpu_blocks_list[-1].append(num_free_gpu_blocks) self.num_free_cpu_blocks_list[-1].append(num_free_cpu_blocks) def update_step_latency(self, step_latency): if isinstance(step_latency, list): self.per_token_latency_list[-1].extend(step_latency) else: self.per_token_latency_list[-1].append(step_latency) def update_step_latency_device(self, step_latency): if isinstance(step_latency, list): self.per_token_latency_device_list[-1].extend(step_latency) else: self.per_token_latency_device_list[-1].append(step_latency) def update_mm_encoder_latency_device(self, step_latency): if isinstance(step_latency, list): if len(step_latency) == 0: return assert len(step_latency) == 1, f"Not supported! Model with multi mm encoder steps. {len(step_latency)} {step_latency}" self.mm_encoder_latency_device_list[-1].extend(step_latency) else: self.mm_encoder_latency_device_list[-1].append(step_latency) def update_spec_decode_metrics(self, spec_decode_metrics): self.num_spec_tokens = spec_decode_metrics.num_spec_tokens self.draft_acceptance_rate = spec_decode_metrics.draft_acceptance_rate def add_metrics(self, batch_size, e2e_latency)->None: self.batch_size_list.append(batch_size) self.e2e_latency_list.append(e2e_latency) self.per_token_latency_list.append([]) # new iter self.per_token_latency_device_list.append([]) self.mm_encoder_latency_device_list.append([]) self.num_free_gpu_blocks_list.append([]) self.num_free_cpu_blocks_list.append([]) def get_weight_dtype_str(self, model_path, model_dtype, quantization) -> str: # get weight dtype based on quantization config if exists if quantization == 'fp8': return quantization if quantization is not None: quant_method = get_quantization_config(quantization) # combine the model path with the quantization config file name quant_config_paths = quant_method.get_config_filenames() # if there are multiple quantization config files, return the first one existed for quant_config_path in quant_config_paths: quant_config_path = os.path.join(model_path, quant_config_path) # check if the quantization config file exists if not os.path.exists(quant_config_path): continue with open(quant_config_path, 'r') as f: quant_config = json.load(f) quant_config = quant_method.from_config(quant_config) # for smoothquant and weightonly, return the quantization name with the weight bits if quantization == "smoothquant" or quantization == ["weightonly"]: return "{}-int{}".format(quant_config.get_name(), quant_config.weight_bits) else: # for other quantization methods, return the quantization name return quant_config.get_name() # if the quantization config file does not exist, just return the quanization name return quant_config_path.get_name() else: # remove the prefix of model dtype from torch config return str(model_dtype).split(".")[-1] def to_csv(self, filename: str, show_per_iter=False) -> None: if show_per_iter: df = pd.DataFrame(self.metrics_data) df = pd.DataFrame([df.iloc[-1]], columns=df.columns) memory_df = pd.DataFrame(self.memory_metrics_data) memory_df = pd.DataFrame([memory_df.iloc[-1]], columns=memory_df.columns) else: df = pd.DataFrame(self.metrics_data) memory_df = pd.DataFrame(self.memory_metrics_data) df_mean = df.mean().round(3) memory_df_mean = memory_df.mean().round(3) header = ["datetime", "model", "weight dtype", self.batch_size_name, ] header = header + list(self.mm_kwargs.keys()) header = header + ["input len", "output len", "tp", self.context_latency_name, self.per_token_latency_name] data = [datetime.now().strftime("%Y-%m-%d %H:%M:%S"), self.model, self.weight_dtype_str, int(self.metrics_data[self.batch_size_name][0])] data = data + [self.mm_kwargs[k] for k in self.mm_kwargs.keys()] data = data + [self.input_len, self.output_len, self.tp, df_mean[self.context_latency_name], df_mean[self.per_token_latency_name]] if self.num_spec_tokens > 0: header += [self.per_step_latency_name] data += [df_mean[self.per_step_latency_name]] if VLLM_LATENCY_DEBUG_WITH_DEVICE_EN: if self.is_v1_multimodal: header += [self.mm_encoder_latency_device_name,] data += [df_mean[self.mm_encoder_latency_device_name],] header += [self.context_latency_device_name, self.per_token_latency_device_name] data += [df_mean[self.context_latency_device_name], df_mean[self.per_token_latency_device_name]] header += [self.e2e_latency_name, self.e2e_throughput_name, self.decoder_throughput_name,] if self.num_spec_tokens > 0: header += [self.k_name, self.acceptance_rate_name] header += [self.decode_times_name, self.peak_memory_name, self.block_memory_name] data += [ df_mean[self.e2e_latency_name], df_mean[self.e2e_throughput_name], df_mean[self.decoder_throughput_name], ] if self.num_spec_tokens > 0: data += [self.num_spec_tokens, df_mean[self.acceptance_rate_name],] data += [df_mean[self.decode_times_name], memory_df_mean[self.peak_memory_name], memory_df_mean[self.block_memory_name],] if VLLM_LATENCY_DEBUG_WITH_DEVICE_EN and self.save_hfu_info: header += [self.context_hfu_name, self.decoder_hfu_name, self.decoder_io_efficiency_name] data += [ df_mean[self.context_hfu_name], df_mean[self.decoder_hfu_name], df_mean[self.decoder_io_efficiency_name] ] data_dict = dict(zip(header, data)) df_csv = pd.DataFrame(data_dict, index=[0]) append = False if os.path.isfile(filename): try: df_old = pd.read_csv(filename) append = (df_old.columns.tolist() == header) except Exception as e: logger.info(f"Existing {filename} failed to be read and will be overwritten") if append: df_csv.to_csv(filename, mode='a', header=False, index=False) logger.info(f"Metric appended to existing {filename}") else: df_csv.to_csv(filename, index=False) logger.info(f"Metric written to {filename}") def calc_metric(self, model, model_dtype, metrics_idx_start, only_average, input_len, output_len, tp_nums, quantization, show_per_iter=False, is_embedding_task=False, mm_kwargs=None, total_prefill_steps=1, num_spec_tokens=0, dp_size=1, hfu_info=None, io_efficiency=0.0) -> None: keep_digits = 2 def round_fn(data): return round(data, keep_digits) metrics_idx_end = len(self.per_token_latency_list) - 1 # without last [] idx_range = range(metrics_idx_start, metrics_idx_end) # specify entries to write to csv self.is_v1_multimodal = mm_kwargs self.mm_kwargs = mm_kwargs if mm_kwargs else {} # multimodal args self.batch_size_name = "batch size" self.input_len = input_len self.output_len = output_len self.tp = tp_nums self.dp = dp_size self.model = model self.context_latency_name = "context latency(ms)" self.mm_encoder_latency_device_name = "multimodal encoder latency device(ms)" self.context_latency_device_name = "context latency device(ms)" if num_spec_tokens > 0: self.per_step_latency_name = "per step latency(ms)" self.per_token_latency_device_name = "per step latency device(ms)" else: self.per_token_latency_device_name = "per token latency device(ms)" self.per_token_latency_name = "per token latency(ms)" self.e2e_latency_name = "e2e latency(ms)" self.e2e_throughput_name = "e2e throughput(tokens/s)" self.decoder_throughput_name = "decoder throughput(tokens/s)" self.k_name = "K" self.acceptance_rate_name = "acceptance rate" self.decode_times_name = "decode times" self.weight_dtype_str = self.get_weight_dtype_str(model, model_dtype, quantization) self.num_spec_tokens = num_spec_tokens rate_list=[] rate=0 if num_spec_tokens > 0: for i in range(metrics_idx_end): if len(self.per_token_latency_list[i]) - total_prefill_steps == 0: logger.warning("For now output_len is 0, no need mtp info, if you need mtp info, please increase output_len.") rate_list.append(0.0) else: rate_list.append(((self.output_len - 1) / (float)(len(self.per_token_latency_list[i]) - total_prefill_steps) - 1) / num_spec_tokens) rate = statistics.fmean(rate_list[metrics_idx_start: metrics_idx_end]) metrics_data = [ ( self.batch_size_name, [self.dp * int(self.batch_size_list[i]) for i in idx_range] ), ( self.context_latency_name, [round_fn(millisecond2second_unit * sum(self.per_token_latency_list[i][:total_prefill_steps])) for i in idx_range] ), ( self.per_token_latency_name, [ 0.0 if len(self.per_token_latency_list[i]) <= total_prefill_steps else \ round_fn(statistics.fmean(self.per_token_latency_list[i][total_prefill_steps:]) * (len(self.per_token_latency_list[i]) - total_prefill_steps) / (self.output_len - 1) * millisecond2second_unit) for i in idx_range ] ), ] if num_spec_tokens > 0: metrics_data += [(self.per_step_latency_name, [ 0.0 if len(self.per_token_latency_list[i]) <= total_prefill_steps else \ round_fn(statistics.fmean(self.per_token_latency_list[i][total_prefill_steps:]) * millisecond2second_unit) for i in idx_range ])] metrics_data += [ ( self.e2e_latency_name, [round_fn(millisecond2second_unit * self.e2e_latency_list[i]) for i in idx_range] ), ( self.e2e_throughput_name, [ round_fn(self.dp * (output_len / self.e2e_latency_list[i]) * self.batch_size_list[i]) \ for i in idx_range ] ), ( self.decoder_throughput_name, [ 0.0 if len(self.per_token_latency_list[i]) <= total_prefill_steps else \ round_fn(self.dp * ((output_len-1) / sum(self.per_token_latency_list[i][total_prefill_steps:])) * self.batch_size_list[i]) \ for i in idx_range ] ), ( self.decode_times_name, [ 0 if len(self.per_token_latency_list[i]) <= total_prefill_steps else \ len(self.per_token_latency_list[i][total_prefill_steps:]) for i in idx_range ] ), ] if num_spec_tokens > 0: metrics_data.append((self.k_name, num_spec_tokens)) metrics_data.append((self.acceptance_rate_name, [rate_list[i] for i in idx_range])) insert_latency_device = VLLM_LATENCY_DEBUG_WITH_DEVICE_EN if insert_latency_device: device_item_idx = 3 if self.is_v1_multimodal: mm_encoder_latency_device = [round_fn(sum(self.mm_encoder_latency_device_list[i])) for i in idx_range] metrics_data.insert(device_item_idx, (self.mm_encoder_latency_device_name, mm_encoder_latency_device)) device_item_idx = device_item_idx + 1 context_latency_device = [round_fn(sum(self.per_token_latency_device_list[i][:total_prefill_steps])) for i in idx_range] per_token_latency_device = [0.0 if len(self.per_token_latency_device_list[i]) <= total_prefill_steps else \ round_fn(statistics.fmean(self.per_token_latency_device_list[i][total_prefill_steps:])) for i in idx_range] metrics_data.insert(device_item_idx, (self.context_latency_device_name, context_latency_device)) metrics_data.insert(device_item_idx + 1, (self.per_token_latency_device_name, per_token_latency_device)) self.metrics_data = dict(metrics_data) # Print df = pd.DataFrame(self.metrics_data) if show_per_iter: df = pd.DataFrame([df.iloc[-1]], columns=df.columns) else: df.loc["Average(" + str(metrics_idx_end-metrics_idx_start) + "iters)"] = df.mean().round(keep_digits) if only_average: df = pd.DataFrame([df.iloc[-1]], columns=df.columns) df.index.name = 'iter index' df[self.batch_size_name] = df[self.batch_size_name].astype(int) if num_spec_tokens > 0: df[self.k_name] = df[self.k_name].astype(int) self.peak_memory_name = "profile memory(GB)" self.block_memory_name = "total cache memory(GB)" memory_metrics_data = [ ( self.peak_memory_name, [round_fn(self.peak_memory / 1024 / 1024 / 1024) for i in idx_range] ), ( self.block_memory_name, [round_fn(self.block_memory / 1024 / 1024 / 1024) for i in idx_range] ), ] self.memory_metrics_data = dict(memory_metrics_data) # Print memory_df = pd.DataFrame(self.memory_metrics_data) if show_per_iter: memory_df = pd.DataFrame([memory_df.iloc[-1]], columns=memory_df.columns) else: memory_df.loc["Average(" + str(metrics_idx_end-metrics_idx_start) + "iters)"] = memory_df.mean().round(keep_digits) if only_average: memory_df = pd.DataFrame([memory_df.iloc[-1]], columns=memory_df.columns) memory_df.index.name = 'iter index' pd.set_option('display.colheader_justify', 'center') pd.set_option('display.max_columns', None) pd.set_option('display.max_rows', None) print("********************************* Test Info****************************") mm_params_text = " ".join(f"{key}:{value}" for key, value in self.mm_kwargs.items()) print("Generation Config {} input len:{} output len:{} tp_nums:{} quantization:{}".format( mm_params_text, input_len,output_len,tp_nums,quantization)) self.context_latency_device = np.mean(self.metrics_data['context latency device(ms)']) self.generate_latency_device = np.mean(self.metrics_data[self.per_token_latency_device_name]) if self.is_v1_multimodal: self.mm_encoder_latency_device = np.mean(self.metrics_data[self.mm_encoder_latency_device_name]) print("*************************Performance Info******************************") print(f"Total prefill steps: {total_prefill_steps}") print(df.to_string()) if not is_embedding_task: # embedding task does not do profile run, so does not have memory infos print(memory_df.to_string()) if insert_latency_device : context_latency = np.mean(self.metrics_data['context latency device(ms)']) generate_latency = np.mean(self.metrics_data[self.per_token_latency_device_name]) if num_spec_tokens > 0: print("MTP token accept rate: {:.2f}%".format(rate*100)) self.dump_performance_info(hfu_info, io_efficiency) avg_latency_e2e = sum(sum(self.per_token_latency_list[i]) for i in idx_range) / len(idx_range) print("Avg latency without host time is :", avg_latency_e2e) print("***********************************************************************") # collect context_hfu and self.save_hfu_info = False if insert_latency_device: if VLLM_DUMP_MLU_INFO_EN: try: import device_info self.save_hfu_info = True except: logger.info(f"try import device_info failed. try pip install device_info.") self.context_hfu_name = "Context HFU" self.decoder_hfu_name = "Decoder HFU" self.decoder_io_efficiency_name = "Decoder IO Efficiency" if self.save_hfu_info: self.metrics_data[self.context_hfu_name] = hfu_info["context_hfu"] * 100 self.metrics_data[self.decoder_hfu_name] = hfu_info["decoder_hfu"] * 100 self.metrics_data[self.decoder_io_efficiency_name] = io_efficiency * 100 if csv_path := os.getenv("OUTPUT_CSV_PATH"): try: if dir_path := os.path.dirname(csv_path): os.makedirs(dir_path, exist_ok=True) self.to_csv(csv_path, show_per_iter=show_per_iter) except Exception as e: logger.error(f"Invalid OUTPUT_CSV_PATH: {csv_path} to dump metrics, Error: {e}") def dump_performance_info(self, hfu_info, io_efficiency): try: if VLLM_DUMP_MLU_INFO_EN and hfu_info != None: hfu_info["context_hfu"] = hfu_info["context_hfu"] / (self.context_latency_device / millisecond2second_unit) hfu_info["decoder_hfu"] = hfu_info["decoder_hfu"] / (self.generate_latency_device / millisecond2second_unit) io_efficiency = io_efficiency / self.generate_latency_device print(f"Context HFU-visible: {hfu_info['context_hfu']:.3%}") print(f"Decoder HFU-visible: {hfu_info['decoder_hfu']:.3%}") print(f"Decoder IO Efficiency: {io_efficiency:.3%}") elif hfu_info != None: print(f"Context FLOPS-visible: {hfu_info['context_flops']}") print(f"Decoder FLOPS-visible: {hfu_info['decoder_flops']}") else: logger.info("Unsupport dump performance information") except Exception as e: logger.error(f"Failed to dump performance information: {str(e)}")