import torch import torch_mlu import torch_mlu_ops as tmo from common import benchmark_forward, save_to_csv import argparse from tabulate import tabulate import os e2e_time_param_dict_list = [{"batch": 2, "m": 1024, "k": 1600, "n": 6400, "has_c": True, "input_dtype": [torch.float16, torch.bfloat16]}, {"batch": 2, "m": 1024, "k": 2048, "n": 8192, "has_c": True, "input_dtype": [torch.float16, torch.bfloat16]}, {"batch": 2, "m": 1024, "k": 4096, "n": 11008, "has_c": True, "input_dtype": [torch.float16, torch.bfloat16]}, {"batch": 2, "m": 1024, "k": 5120, "n": 16384, "has_c": True, "input_dtype": [torch.float16, torch.bfloat16]}, {"batch": 2, "m": 1024, "k": 6144, "n": 24576, "has_c": True, "input_dtype": [torch.float16, torch.bfloat16]}] def main(): parser = argparse.ArgumentParser() parser.add_argument('--repeat_times', type=int, default=10, help='repeat times for testing') parser.add_argument('--csv', action='store_true', help='write the report data to csv') parser.add_argument('-o', type=str, help='specify the output folder name under --csv mode') args = parser.parse_args() device = 'mlu' titles = ["batch", "m", "k", "n", "has_c", "input_dtype", "hardware_time(us)", "e2e_latency(us)"] contents = [] for params_dict in e2e_time_param_dict_list: batch = params_dict["batch"] m = params_dict["m"] k = params_dict["k"] n = params_dict["n"] has_c = params_dict["has_c"] input_dtype_list = params_dict["input_dtype"] for dtype in input_dtype_list: if dtype == torch.bfloat16 and not torch_mlu.mlu.is_bf16_supported(): continue a = torch.randn(batch, m, k).to(device).to(dtype) b = torch.randn(batch, n, k).to(device).to(dtype) c = None if has_c: c = torch.randn(batch, m, n).to(device).to(dtype) hardware_time, e2e_time = benchmark_forward(tmo.batch_matmul, a, b, c, 1.0, 1.0, repeats=args.repeat_times) content = [f"{batch}", f"{m}", f"{k}", f"{n}", f"{has_c}", f"{dtype}", f"{hardware_time}", f"{e2e_time}"] contents.append(content) table = [titles] + contents print(tabulate(table, headers="firstrow", tablefmt="grid")) if args.csv: current_file_path = __file__ _, file_name = os.path.split(current_file_path) save_to_csv(table, args.o, file_name) if __name__=="__main__": main()