| #!/usr/bin/env python3 |
| # SPDX-License-Identifier: GPL-2.0 |
| # |
| # Program to allow users to fuzz test Hyper-V drivers |
| # by interfacing with Hyper-V debugfs attributes. |
| # Current test methods available: |
| # 1. delay testing |
| # |
| # Current file/directory structure of hyper-V debugfs: |
| # /sys/kernel/debug/hyperv/UUID |
| # /sys/kernel/debug/hyperv/UUID/<test-state filename> |
| # /sys/kernel/debug/hyperv/UUID/<test-method sub-directory> |
| # |
| # author: Branden Bonaby <brandonbonaby94@gmail.com> |
| |
| import os |
| import cmd |
| import argparse |
| import glob |
| from argparse import RawDescriptionHelpFormatter |
| from argparse import RawTextHelpFormatter |
| from enum import Enum |
| |
| # Do not change unless, you change the debugfs attributes |
| # in /drivers/hv/debugfs.c. All fuzz testing |
| # attributes will start with "fuzz_test". |
| |
| # debugfs path for hyperv must exist before proceeding |
| debugfs_hyperv_path = "/sys/kernel/debug/hyperv" |
| if not os.path.isdir(debugfs_hyperv_path): |
| print("{} doesn't exist/check permissions".format(debugfs_hyperv_path)) |
| exit(-1) |
| |
| class dev_state(Enum): |
| off = 0 |
| on = 1 |
| |
| # File names, that correspond to the files created in |
| # /drivers/hv/debugfs.c |
| class f_names(Enum): |
| state_f = "fuzz_test_state" |
| buff_f = "fuzz_test_buffer_interrupt_delay" |
| mess_f = "fuzz_test_message_delay" |
| |
| # Both single_actions and all_actions are used |
| # for error checking and to allow for some subparser |
| # names to be abbreviated. Do not abbreviate the |
| # test method names, as it will become less intuitive |
| # as to what the user can do. If you do decide to |
| # abbreviate the test method name, make sure the main |
| # function reflects this change. |
| |
| all_actions = [ |
| "disable_all", |
| "D", |
| "enable_all", |
| "view_all", |
| "V" |
| ] |
| |
| single_actions = [ |
| "disable_single", |
| "d", |
| "enable_single", |
| "view_single", |
| "v" |
| ] |
| |
| def main(): |
| |
| file_map = recursive_file_lookup(debugfs_hyperv_path, dict()) |
| args = parse_args() |
| if (not args.action): |
| print ("Error, no options selected...exiting") |
| exit(-1) |
| arg_set = { k for (k,v) in vars(args).items() if v and k != "action" } |
| arg_set.add(args.action) |
| path = args.path if "path" in arg_set else None |
| if (path and path[-1] == "/"): |
| path = path[:-1] |
| validate_args_path(path, arg_set, file_map) |
| if (path and "enable_single" in arg_set): |
| state_path = locate_state(path, file_map) |
| set_test_state(state_path, dev_state.on.value, args.quiet) |
| |
| # Use subparsers as the key for different actions |
| if ("delay" in arg_set): |
| validate_delay_values(args.delay_time) |
| if (args.enable_all): |
| set_delay_all_devices(file_map, args.delay_time, |
| args.quiet) |
| else: |
| set_delay_values(path, file_map, args.delay_time, |
| args.quiet) |
| elif ("disable_all" in arg_set or "D" in arg_set): |
| disable_all_testing(file_map) |
| elif ("disable_single" in arg_set or "d" in arg_set): |
| disable_testing_single_device(path, file_map) |
| elif ("view_all" in arg_set or "V" in arg_set): |
| get_all_devices_test_status(file_map) |
| elif ("view_single" in arg_set or "v" in arg_set): |
| get_device_test_values(path, file_map) |
| |
| # Get the state location |
| def locate_state(device, file_map): |
| return file_map[device][f_names.state_f.value] |
| |
| # Validate delay values to make sure they are acceptable to |
| # enable delays on a device |
| def validate_delay_values(delay): |
| |
| if (delay[0] == -1 and delay[1] == -1): |
| print("\nError, At least 1 value must be greater than 0") |
| exit(-1) |
| for i in delay: |
| if (i < -1 or i == 0 or i > 1000): |
| print("\nError, Values must be equal to -1 " |
| "or be > 0 and <= 1000") |
| exit(-1) |
| |
| # Validate argument path |
| def validate_args_path(path, arg_set, file_map): |
| |
| if (not path and any(element in arg_set for element in single_actions)): |
| print("Error, path (-p) REQUIRED for the specified option. " |
| "Use (-h) to check usage.") |
| exit(-1) |
| elif (path and any(item in arg_set for item in all_actions)): |
| print("Error, path (-p) NOT REQUIRED for the specified option. " |
| "Use (-h) to check usage." ) |
| exit(-1) |
| elif (path not in file_map and any(item in arg_set |
| for item in single_actions)): |
| print("Error, path '{}' not a valid vmbus device".format(path)) |
| exit(-1) |
| |
| # display Testing status of single device |
| def get_device_test_values(path, file_map): |
| |
| for name in file_map[path]: |
| file_location = file_map[path][name] |
| print( name + " = " + str(read_test_files(file_location))) |
| |
| # Create a map of the vmbus devices and their associated files |
| # [key=device, value = [key = filename, value = file path]] |
| def recursive_file_lookup(path, file_map): |
| |
| for f_path in glob.iglob(path + '**/*'): |
| if (os.path.isfile(f_path)): |
| if (f_path.rsplit("/",2)[0] == debugfs_hyperv_path): |
| directory = f_path.rsplit("/",1)[0] |
| else: |
| directory = f_path.rsplit("/",2)[0] |
| f_name = f_path.split("/")[-1] |
| if (file_map.get(directory)): |
| file_map[directory].update({f_name:f_path}) |
| else: |
| file_map[directory] = {f_name:f_path} |
| elif (os.path.isdir(f_path)): |
| recursive_file_lookup(f_path,file_map) |
| return file_map |
| |
| # display Testing state of devices |
| def get_all_devices_test_status(file_map): |
| |
| for device in file_map: |
| if (get_test_state(locate_state(device, file_map)) is 1): |
| print("Testing = ON for: {}" |
| .format(device.split("/")[5])) |
| else: |
| print("Testing = OFF for: {}" |
| .format(device.split("/")[5])) |
| |
| # read the vmbus device files, path must be absolute path before calling |
| def read_test_files(path): |
| try: |
| with open(path,"r") as f: |
| file_value = f.readline().strip() |
| return int(file_value) |
| |
| except IOError as e: |
| errno, strerror = e.args |
| print("I/O error({0}): {1} on file {2}" |
| .format(errno, strerror, path)) |
| exit(-1) |
| except ValueError: |
| print ("Element to int conversion error in: \n{}".format(path)) |
| exit(-1) |
| |
| # writing to vmbus device files, path must be absolute path before calling |
| def write_test_files(path, value): |
| |
| try: |
| with open(path,"w") as f: |
| f.write("{}".format(value)) |
| except IOError as e: |
| errno, strerror = e.args |
| print("I/O error({0}): {1} on file {2}" |
| .format(errno, strerror, path)) |
| exit(-1) |
| |
| # set testing state of device |
| def set_test_state(state_path, state_value, quiet): |
| |
| write_test_files(state_path, state_value) |
| if (get_test_state(state_path) is 1): |
| if (not quiet): |
| print("Testing = ON for device: {}" |
| .format(state_path.split("/")[5])) |
| else: |
| if (not quiet): |
| print("Testing = OFF for device: {}" |
| .format(state_path.split("/")[5])) |
| |
| # get testing state of device |
| def get_test_state(state_path): |
| #state == 1 - test = ON |
| #state == 0 - test = OFF |
| return read_test_files(state_path) |
| |
| # write 1 - 1000 microseconds, into a single device using the |
| # fuzz_test_buffer_interrupt_delay and fuzz_test_message_delay |
| # debugfs attributes |
| def set_delay_values(device, file_map, delay_length, quiet): |
| |
| try: |
| interrupt = file_map[device][f_names.buff_f.value] |
| message = file_map[device][f_names.mess_f.value] |
| |
| # delay[0]- buffer interrupt delay, delay[1]- message delay |
| if (delay_length[0] >= 0 and delay_length[0] <= 1000): |
| write_test_files(interrupt, delay_length[0]) |
| if (delay_length[1] >= 0 and delay_length[1] <= 1000): |
| write_test_files(message, delay_length[1]) |
| if (not quiet): |
| print("Buffer delay testing = {} for: {}" |
| .format(read_test_files(interrupt), |
| interrupt.split("/")[5])) |
| print("Message delay testing = {} for: {}" |
| .format(read_test_files(message), |
| message.split("/")[5])) |
| except IOError as e: |
| errno, strerror = e.args |
| print("I/O error({0}): {1} on files {2}{3}" |
| .format(errno, strerror, interrupt, message)) |
| exit(-1) |
| |
| # enabling delay testing on all devices |
| def set_delay_all_devices(file_map, delay, quiet): |
| |
| for device in (file_map): |
| set_test_state(locate_state(device, file_map), |
| dev_state.on.value, |
| quiet) |
| set_delay_values(device, file_map, delay, quiet) |
| |
| # disable all testing on a SINGLE device. |
| def disable_testing_single_device(device, file_map): |
| |
| for name in file_map[device]: |
| file_location = file_map[device][name] |
| write_test_files(file_location, dev_state.off.value) |
| print("ALL testing now OFF for {}".format(device.split("/")[-1])) |
| |
| # disable all testing on ALL devices |
| def disable_all_testing(file_map): |
| |
| for device in file_map: |
| disable_testing_single_device(device, file_map) |
| |
| def parse_args(): |
| parser = argparse.ArgumentParser(prog = "vmbus_testing",usage ="\n" |
| "%(prog)s [delay] [-h] [-e|-E] -t [-p]\n" |
| "%(prog)s [view_all | V] [-h]\n" |
| "%(prog)s [disable_all | D] [-h]\n" |
| "%(prog)s [disable_single | d] [-h|-p]\n" |
| "%(prog)s [view_single | v] [-h|-p]\n" |
| "%(prog)s --version\n", |
| description = "\nUse lsvmbus to get vmbus device type " |
| "information.\n" "\nThe debugfs root path is " |
| "/sys/kernel/debug/hyperv", |
| formatter_class = RawDescriptionHelpFormatter) |
| subparsers = parser.add_subparsers(dest = "action") |
| parser.add_argument("--version", action = "version", |
| version = '%(prog)s 0.1.0') |
| parser.add_argument("-q","--quiet", action = "store_true", |
| help = "silence none important test messages." |
| " This will only work when enabling testing" |
| " on a device.") |
| # Use the path parser to hold the --path attribute so it can |
| # be shared between subparsers. Also do the same for the state |
| # parser, as all testing methods will use --enable_all and |
| # enable_single. |
| path_parser = argparse.ArgumentParser(add_help=False) |
| path_parser.add_argument("-p","--path", metavar = "", |
| help = "Debugfs path to a vmbus device. The path " |
| "must be the absolute path to the device.") |
| state_parser = argparse.ArgumentParser(add_help=False) |
| state_group = state_parser.add_mutually_exclusive_group(required = True) |
| state_group.add_argument("-E", "--enable_all", action = "store_const", |
| const = "enable_all", |
| help = "Enable the specified test type " |
| "on ALL vmbus devices.") |
| state_group.add_argument("-e", "--enable_single", |
| action = "store_const", |
| const = "enable_single", |
| help = "Enable the specified test type on a " |
| "SINGLE vmbus device.") |
| parser_delay = subparsers.add_parser("delay", |
| parents = [state_parser, path_parser], |
| help = "Delay the ring buffer interrupt or the " |
| "ring buffer message reads in microseconds.", |
| prog = "vmbus_testing", |
| usage = "%(prog)s [-h]\n" |
| "%(prog)s -E -t [value] [value]\n" |
| "%(prog)s -e -t [value] [value] -p", |
| description = "Delay the ring buffer interrupt for " |
| "vmbus devices, or delay the ring buffer message " |
| "reads for vmbus devices (both in microseconds). This " |
| "is only on the host to guest channel.") |
| parser_delay.add_argument("-t", "--delay_time", metavar = "", nargs = 2, |
| type = check_range, default =[0,0], required = (True), |
| help = "Set [buffer] & [message] delay time. " |
| "Value constraints: -1 == value " |
| "or 0 < value <= 1000.\n" |
| "Use -1 to keep the previous value for that delay " |
| "type, or a value > 0 <= 1000 to change the delay " |
| "time.") |
| parser_dis_all = subparsers.add_parser("disable_all", |
| aliases = ['D'], prog = "vmbus_testing", |
| usage = "%(prog)s [disable_all | D] -h\n" |
| "%(prog)s [disable_all | D]\n", |
| help = "Disable ALL testing on ALL vmbus devices.", |
| description = "Disable ALL testing on ALL vmbus " |
| "devices.") |
| parser_dis_single = subparsers.add_parser("disable_single", |
| aliases = ['d'], |
| parents = [path_parser], prog = "vmbus_testing", |
| usage = "%(prog)s [disable_single | d] -h\n" |
| "%(prog)s [disable_single | d] -p\n", |
| help = "Disable ALL testing on a SINGLE vmbus device.", |
| description = "Disable ALL testing on a SINGLE vmbus " |
| "device.") |
| parser_view_all = subparsers.add_parser("view_all", aliases = ['V'], |
| help = "View the test state for ALL vmbus devices.", |
| prog = "vmbus_testing", |
| usage = "%(prog)s [view_all | V] -h\n" |
| "%(prog)s [view_all | V]\n", |
| description = "This shows the test state for ALL the " |
| "vmbus devices.") |
| parser_view_single = subparsers.add_parser("view_single", |
| aliases = ['v'],parents = [path_parser], |
| help = "View the test values for a SINGLE vmbus " |
| "device.", |
| description = "This shows the test values for a SINGLE " |
| "vmbus device.", prog = "vmbus_testing", |
| usage = "%(prog)s [view_single | v] -h\n" |
| "%(prog)s [view_single | v] -p") |
| |
| return parser.parse_args() |
| |
| # value checking for range checking input in parser |
| def check_range(arg1): |
| |
| try: |
| val = int(arg1) |
| except ValueError as err: |
| raise argparse.ArgumentTypeError(str(err)) |
| if val < -1 or val > 1000: |
| message = ("\n\nvalue must be -1 or 0 < value <= 1000. " |
| "Value program received: {}\n").format(val) |
| raise argparse.ArgumentTypeError(message) |
| return val |
| |
| if __name__ == "__main__": |
| main() |