1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 3# 4# Program to allow users to fuzz test Hyper-V drivers 5# by interfacing with Hyper-V debugfs attributes. 6# Current test methods available: 7# 1. delay testing 8# 9# Current file/directory structure of hyper-V debugfs: 10# /sys/kernel/debug/hyperv/UUID 11# /sys/kernel/debug/hyperv/UUID/<test-state filename> 12# /sys/kernel/debug/hyperv/UUID/<test-method sub-directory> 13# 14# author: Branden Bonaby <brandonbonaby94@gmail.com> 15 16import os 17import cmd 18import argparse 19import glob 20from argparse import RawDescriptionHelpFormatter 21from argparse import RawTextHelpFormatter 22from enum import Enum 23 24# Do not change unless, you change the debugfs attributes 25# in /drivers/hv/debugfs.c. All fuzz testing 26# attributes will start with "fuzz_test". 27 28# debugfs path for hyperv must exist before proceeding 29debugfs_hyperv_path = "/sys/kernel/debug/hyperv" 30if not os.path.isdir(debugfs_hyperv_path): 31 print("{} doesn't exist/check permissions".format(debugfs_hyperv_path)) 32 exit(-1) 33 34class dev_state(Enum): 35 off = 0 36 on = 1 37 38# File names, that correspond to the files created in 39# /drivers/hv/debugfs.c 40class f_names(Enum): 41 state_f = "fuzz_test_state" 42 buff_f = "fuzz_test_buffer_interrupt_delay" 43 mess_f = "fuzz_test_message_delay" 44 45# Both single_actions and all_actions are used 46# for error checking and to allow for some subparser 47# names to be abbreviated. Do not abbreviate the 48# test method names, as it will become less intuitive 49# as to what the user can do. If you do decide to 50# abbreviate the test method name, make sure the main 51# function reflects this change. 52 53all_actions = [ 54 "disable_all", 55 "D", 56 "enable_all", 57 "view_all", 58 "V" 59] 60 61single_actions = [ 62 "disable_single", 63 "d", 64 "enable_single", 65 "view_single", 66 "v" 67] 68 69def main(): 70 71 file_map = recursive_file_lookup(debugfs_hyperv_path, dict()) 72 args = parse_args() 73 if (not args.action): 74 print ("Error, no options selected...exiting") 75 exit(-1) 76 arg_set = { k for (k,v) in vars(args).items() if v and k != "action" } 77 arg_set.add(args.action) 78 path = args.path if "path" in arg_set else None 79 if (path and path[-1] == "/"): 80 path = path[:-1] 81 validate_args_path(path, arg_set, file_map) 82 if (path and "enable_single" in arg_set): 83 state_path = locate_state(path, file_map) 84 set_test_state(state_path, dev_state.on.value, args.quiet) 85 86 # Use subparsers as the key for different actions 87 if ("delay" in arg_set): 88 validate_delay_values(args.delay_time) 89 if (args.enable_all): 90 set_delay_all_devices(file_map, args.delay_time, 91 args.quiet) 92 else: 93 set_delay_values(path, file_map, args.delay_time, 94 args.quiet) 95 elif ("disable_all" in arg_set or "D" in arg_set): 96 disable_all_testing(file_map) 97 elif ("disable_single" in arg_set or "d" in arg_set): 98 disable_testing_single_device(path, file_map) 99 elif ("view_all" in arg_set or "V" in arg_set): 100 get_all_devices_test_status(file_map) 101 elif ("view_single" in arg_set or "v" in arg_set): 102 get_device_test_values(path, file_map) 103 104# Get the state location 105def locate_state(device, file_map): 106 return file_map[device][f_names.state_f.value] 107 108# Validate delay values to make sure they are acceptable to 109# enable delays on a device 110def validate_delay_values(delay): 111 112 if (delay[0] == -1 and delay[1] == -1): 113 print("\nError, At least 1 value must be greater than 0") 114 exit(-1) 115 for i in delay: 116 if (i < -1 or i == 0 or i > 1000): 117 print("\nError, Values must be equal to -1 " 118 "or be > 0 and <= 1000") 119 exit(-1) 120 121# Validate argument path 122def validate_args_path(path, arg_set, file_map): 123 124 if (not path and any(element in arg_set for element in single_actions)): 125 print("Error, path (-p) REQUIRED for the specified option. " 126 "Use (-h) to check usage.") 127 exit(-1) 128 elif (path and any(item in arg_set for item in all_actions)): 129 print("Error, path (-p) NOT REQUIRED for the specified option. " 130 "Use (-h) to check usage." ) 131 exit(-1) 132 elif (path not in file_map and any(item in arg_set 133 for item in single_actions)): 134 print("Error, path '{}' not a valid vmbus device".format(path)) 135 exit(-1) 136 137# display Testing status of single device 138def get_device_test_values(path, file_map): 139 140 for name in file_map[path]: 141 file_location = file_map[path][name] 142 print( name + " = " + str(read_test_files(file_location))) 143 144# Create a map of the vmbus devices and their associated files 145# [key=device, value = [key = filename, value = file path]] 146def recursive_file_lookup(path, file_map): 147 148 for f_path in glob.iglob(path + '**/*'): 149 if (os.path.isfile(f_path)): 150 if (f_path.rsplit("/",2)[0] == debugfs_hyperv_path): 151 directory = f_path.rsplit("/",1)[0] 152 else: 153 directory = f_path.rsplit("/",2)[0] 154 f_name = f_path.split("/")[-1] 155 if (file_map.get(directory)): 156 file_map[directory].update({f_name:f_path}) 157 else: 158 file_map[directory] = {f_name:f_path} 159 elif (os.path.isdir(f_path)): 160 recursive_file_lookup(f_path,file_map) 161 return file_map 162 163# display Testing state of devices 164def get_all_devices_test_status(file_map): 165 166 for device in file_map: 167 if (get_test_state(locate_state(device, file_map)) == 1): 168 print("Testing = ON for: {}" 169 .format(device.split("/")[5])) 170 else: 171 print("Testing = OFF for: {}" 172 .format(device.split("/")[5])) 173 174# read the vmbus device files, path must be absolute path before calling 175def read_test_files(path): 176 try: 177 with open(path,"r") as f: 178 file_value = f.readline().strip() 179 return int(file_value) 180 181 except IOError as e: 182 errno, strerror = e.args 183 print("I/O error({0}): {1} on file {2}" 184 .format(errno, strerror, path)) 185 exit(-1) 186 except ValueError: 187 print ("Element to int conversion error in: \n{}".format(path)) 188 exit(-1) 189 190# writing to vmbus device files, path must be absolute path before calling 191def write_test_files(path, value): 192 193 try: 194 with open(path,"w") as f: 195 f.write("{}".format(value)) 196 except IOError as e: 197 errno, strerror = e.args 198 print("I/O error({0}): {1} on file {2}" 199 .format(errno, strerror, path)) 200 exit(-1) 201 202# set testing state of device 203def set_test_state(state_path, state_value, quiet): 204 205 write_test_files(state_path, state_value) 206 if (get_test_state(state_path) == 1): 207 if (not quiet): 208 print("Testing = ON for device: {}" 209 .format(state_path.split("/")[5])) 210 else: 211 if (not quiet): 212 print("Testing = OFF for device: {}" 213 .format(state_path.split("/")[5])) 214 215# get testing state of device 216def get_test_state(state_path): 217 #state == 1 - test = ON 218 #state == 0 - test = OFF 219 return read_test_files(state_path) 220 221# write 1 - 1000 microseconds, into a single device using the 222# fuzz_test_buffer_interrupt_delay and fuzz_test_message_delay 223# debugfs attributes 224def set_delay_values(device, file_map, delay_length, quiet): 225 226 try: 227 interrupt = file_map[device][f_names.buff_f.value] 228 message = file_map[device][f_names.mess_f.value] 229 230 # delay[0]- buffer interrupt delay, delay[1]- message delay 231 if (delay_length[0] >= 0 and delay_length[0] <= 1000): 232 write_test_files(interrupt, delay_length[0]) 233 if (delay_length[1] >= 0 and delay_length[1] <= 1000): 234 write_test_files(message, delay_length[1]) 235 if (not quiet): 236 print("Buffer delay testing = {} for: {}" 237 .format(read_test_files(interrupt), 238 interrupt.split("/")[5])) 239 print("Message delay testing = {} for: {}" 240 .format(read_test_files(message), 241 message.split("/")[5])) 242 except IOError as e: 243 errno, strerror = e.args 244 print("I/O error({0}): {1} on files {2}{3}" 245 .format(errno, strerror, interrupt, message)) 246 exit(-1) 247 248# enabling delay testing on all devices 249def set_delay_all_devices(file_map, delay, quiet): 250 251 for device in (file_map): 252 set_test_state(locate_state(device, file_map), 253 dev_state.on.value, 254 quiet) 255 set_delay_values(device, file_map, delay, quiet) 256 257# disable all testing on a SINGLE device. 258def disable_testing_single_device(device, file_map): 259 260 for name in file_map[device]: 261 file_location = file_map[device][name] 262 write_test_files(file_location, dev_state.off.value) 263 print("ALL testing now OFF for {}".format(device.split("/")[-1])) 264 265# disable all testing on ALL devices 266def disable_all_testing(file_map): 267 268 for device in file_map: 269 disable_testing_single_device(device, file_map) 270 271def parse_args(): 272 parser = argparse.ArgumentParser(prog = "vmbus_testing",usage ="\n" 273 "%(prog)s [delay] [-h] [-e|-E] -t [-p]\n" 274 "%(prog)s [view_all | V] [-h]\n" 275 "%(prog)s [disable_all | D] [-h]\n" 276 "%(prog)s [disable_single | d] [-h|-p]\n" 277 "%(prog)s [view_single | v] [-h|-p]\n" 278 "%(prog)s --version\n", 279 description = "\nUse lsvmbus to get vmbus device type " 280 "information.\n" "\nThe debugfs root path is " 281 "/sys/kernel/debug/hyperv", 282 formatter_class = RawDescriptionHelpFormatter) 283 subparsers = parser.add_subparsers(dest = "action") 284 parser.add_argument("--version", action = "version", 285 version = '%(prog)s 0.1.0') 286 parser.add_argument("-q","--quiet", action = "store_true", 287 help = "silence none important test messages." 288 " This will only work when enabling testing" 289 " on a device.") 290 # Use the path parser to hold the --path attribute so it can 291 # be shared between subparsers. Also do the same for the state 292 # parser, as all testing methods will use --enable_all and 293 # enable_single. 294 path_parser = argparse.ArgumentParser(add_help=False) 295 path_parser.add_argument("-p","--path", metavar = "", 296 help = "Debugfs path to a vmbus device. The path " 297 "must be the absolute path to the device.") 298 state_parser = argparse.ArgumentParser(add_help=False) 299 state_group = state_parser.add_mutually_exclusive_group(required = True) 300 state_group.add_argument("-E", "--enable_all", action = "store_const", 301 const = "enable_all", 302 help = "Enable the specified test type " 303 "on ALL vmbus devices.") 304 state_group.add_argument("-e", "--enable_single", 305 action = "store_const", 306 const = "enable_single", 307 help = "Enable the specified test type on a " 308 "SINGLE vmbus device.") 309 parser_delay = subparsers.add_parser("delay", 310 parents = [state_parser, path_parser], 311 help = "Delay the ring buffer interrupt or the " 312 "ring buffer message reads in microseconds.", 313 prog = "vmbus_testing", 314 usage = "%(prog)s [-h]\n" 315 "%(prog)s -E -t [value] [value]\n" 316 "%(prog)s -e -t [value] [value] -p", 317 description = "Delay the ring buffer interrupt for " 318 "vmbus devices, or delay the ring buffer message " 319 "reads for vmbus devices (both in microseconds). This " 320 "is only on the host to guest channel.") 321 parser_delay.add_argument("-t", "--delay_time", metavar = "", nargs = 2, 322 type = check_range, default =[0,0], required = (True), 323 help = "Set [buffer] & [message] delay time. " 324 "Value constraints: -1 == value " 325 "or 0 < value <= 1000.\n" 326 "Use -1 to keep the previous value for that delay " 327 "type, or a value > 0 <= 1000 to change the delay " 328 "time.") 329 parser_dis_all = subparsers.add_parser("disable_all", 330 aliases = ['D'], prog = "vmbus_testing", 331 usage = "%(prog)s [disable_all | D] -h\n" 332 "%(prog)s [disable_all | D]\n", 333 help = "Disable ALL testing on ALL vmbus devices.", 334 description = "Disable ALL testing on ALL vmbus " 335 "devices.") 336 parser_dis_single = subparsers.add_parser("disable_single", 337 aliases = ['d'], 338 parents = [path_parser], prog = "vmbus_testing", 339 usage = "%(prog)s [disable_single | d] -h\n" 340 "%(prog)s [disable_single | d] -p\n", 341 help = "Disable ALL testing on a SINGLE vmbus device.", 342 description = "Disable ALL testing on a SINGLE vmbus " 343 "device.") 344 parser_view_all = subparsers.add_parser("view_all", aliases = ['V'], 345 help = "View the test state for ALL vmbus devices.", 346 prog = "vmbus_testing", 347 usage = "%(prog)s [view_all | V] -h\n" 348 "%(prog)s [view_all | V]\n", 349 description = "This shows the test state for ALL the " 350 "vmbus devices.") 351 parser_view_single = subparsers.add_parser("view_single", 352 aliases = ['v'],parents = [path_parser], 353 help = "View the test values for a SINGLE vmbus " 354 "device.", 355 description = "This shows the test values for a SINGLE " 356 "vmbus device.", prog = "vmbus_testing", 357 usage = "%(prog)s [view_single | v] -h\n" 358 "%(prog)s [view_single | v] -p") 359 360 return parser.parse_args() 361 362# value checking for range checking input in parser 363def check_range(arg1): 364 365 try: 366 val = int(arg1) 367 except ValueError as err: 368 raise argparse.ArgumentTypeError(str(err)) 369 if val < -1 or val > 1000: 370 message = ("\n\nvalue must be -1 or 0 < value <= 1000. " 371 "Value program received: {}\n").format(val) 372 raise argparse.ArgumentTypeError(message) 373 return val 374 375if __name__ == "__main__": 376 main() 377