1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 3# 4# A thin wrapper on top of the KUnit Kernel 5# 6# Copyright (C) 2019, Google LLC. 7# Author: Felix Guo <felixguoxiuping@gmail.com> 8# Author: Brendan Higgins <brendanhiggins@google.com> 9 10import argparse 11import os 12import re 13import sys 14import time 15 16assert sys.version_info >= (3, 7), "Python version is too old" 17 18from dataclasses import dataclass 19from enum import Enum, auto 20from typing import Iterable, List, Optional, Sequence, Tuple 21 22import kunit_json 23import kunit_kernel 24import kunit_parser 25 26class KunitStatus(Enum): 27 SUCCESS = auto() 28 CONFIG_FAILURE = auto() 29 BUILD_FAILURE = auto() 30 TEST_FAILURE = auto() 31 32@dataclass 33class KunitResult: 34 status: KunitStatus 35 elapsed_time: float 36 37@dataclass 38class KunitConfigRequest: 39 build_dir: str 40 make_options: Optional[List[str]] 41 42@dataclass 43class KunitBuildRequest(KunitConfigRequest): 44 jobs: int 45 alltests: bool 46 47@dataclass 48class KunitParseRequest: 49 raw_output: Optional[str] 50 json: Optional[str] 51 52@dataclass 53class KunitExecRequest(KunitParseRequest): 54 build_dir: str 55 timeout: int 56 alltests: bool 57 filter_glob: str 58 kernel_args: Optional[List[str]] 59 run_isolated: Optional[str] 60 61@dataclass 62class KunitRequest(KunitExecRequest, KunitBuildRequest): 63 pass 64 65 66def get_kernel_root_path() -> str: 67 path = sys.argv[0] if not __file__ else __file__ 68 parts = os.path.realpath(path).split('tools/testing/kunit') 69 if len(parts) != 2: 70 sys.exit(1) 71 return parts[0] 72 73def config_tests(linux: kunit_kernel.LinuxSourceTree, 74 request: KunitConfigRequest) -> KunitResult: 75 kunit_parser.print_with_timestamp('Configuring KUnit Kernel ...') 76 77 config_start = time.time() 78 success = linux.build_reconfig(request.build_dir, request.make_options) 79 config_end = time.time() 80 if not success: 81 return KunitResult(KunitStatus.CONFIG_FAILURE, 82 config_end - config_start) 83 return KunitResult(KunitStatus.SUCCESS, 84 config_end - config_start) 85 86def build_tests(linux: kunit_kernel.LinuxSourceTree, 87 request: KunitBuildRequest) -> KunitResult: 88 kunit_parser.print_with_timestamp('Building KUnit Kernel ...') 89 90 build_start = time.time() 91 success = linux.build_kernel(request.alltests, 92 request.jobs, 93 request.build_dir, 94 request.make_options) 95 build_end = time.time() 96 if not success: 97 return KunitResult(KunitStatus.BUILD_FAILURE, 98 build_end - build_start) 99 if not success: 100 return KunitResult(KunitStatus.BUILD_FAILURE, 101 build_end - build_start) 102 return KunitResult(KunitStatus.SUCCESS, 103 build_end - build_start) 104 105def config_and_build_tests(linux: kunit_kernel.LinuxSourceTree, 106 request: KunitBuildRequest) -> KunitResult: 107 config_result = config_tests(linux, request) 108 if config_result.status != KunitStatus.SUCCESS: 109 return config_result 110 111 return build_tests(linux, request) 112 113def _list_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> List[str]: 114 args = ['kunit.action=list'] 115 if request.kernel_args: 116 args.extend(request.kernel_args) 117 118 output = linux.run_kernel(args=args, 119 timeout=None if request.alltests else request.timeout, 120 filter_glob=request.filter_glob, 121 build_dir=request.build_dir) 122 lines = kunit_parser.extract_tap_lines(output) 123 # Hack! Drop the dummy TAP version header that the executor prints out. 124 lines.pop() 125 126 # Filter out any extraneous non-test output that might have gotten mixed in. 127 return [l for l in lines if re.match('^[^\s.]+\.[^\s.]+$', l)] 128 129def _suites_from_test_list(tests: List[str]) -> List[str]: 130 """Extracts all the suites from an ordered list of tests.""" 131 suites = [] # type: List[str] 132 for t in tests: 133 parts = t.split('.', maxsplit=2) 134 if len(parts) != 2: 135 raise ValueError(f'internal KUnit error, test name should be of the form "<suite>.<test>", got "{t}"') 136 suite, case = parts 137 if not suites or suites[-1] != suite: 138 suites.append(suite) 139 return suites 140 141 142 143def exec_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> KunitResult: 144 filter_globs = [request.filter_glob] 145 if request.run_isolated: 146 tests = _list_tests(linux, request) 147 if request.run_isolated == 'test': 148 filter_globs = tests 149 if request.run_isolated == 'suite': 150 filter_globs = _suites_from_test_list(tests) 151 # Apply the test-part of the user's glob, if present. 152 if '.' in request.filter_glob: 153 test_glob = request.filter_glob.split('.', maxsplit=2)[1] 154 filter_globs = [g + '.'+ test_glob for g in filter_globs] 155 156 metadata = kunit_json.Metadata(arch=linux.arch(), build_dir=request.build_dir, def_config='kunit_defconfig') 157 158 test_counts = kunit_parser.TestCounts() 159 exec_time = 0.0 160 for i, filter_glob in enumerate(filter_globs): 161 kunit_parser.print_with_timestamp('Starting KUnit Kernel ({}/{})...'.format(i+1, len(filter_globs))) 162 163 test_start = time.time() 164 run_result = linux.run_kernel( 165 args=request.kernel_args, 166 timeout=None if request.alltests else request.timeout, 167 filter_glob=filter_glob, 168 build_dir=request.build_dir) 169 170 _, test_result = parse_tests(request, metadata, run_result) 171 # run_kernel() doesn't block on the kernel exiting. 172 # That only happens after we get the last line of output from `run_result`. 173 # So exec_time here actually contains parsing + execution time, which is fine. 174 test_end = time.time() 175 exec_time += test_end - test_start 176 177 test_counts.add_subtest_counts(test_result.counts) 178 179 if len(filter_globs) == 1 and test_counts.crashed > 0: 180 bd = request.build_dir 181 print('The kernel seems to have crashed; you can decode the stack traces with:') 182 print('$ scripts/decode_stacktrace.sh {}/vmlinux {} < {} | tee {}/decoded.log | {} parse'.format( 183 bd, bd, kunit_kernel.get_outfile_path(bd), bd, sys.argv[0])) 184 185 kunit_status = _map_to_overall_status(test_counts.get_status()) 186 return KunitResult(status=kunit_status, elapsed_time=exec_time) 187 188def _map_to_overall_status(test_status: kunit_parser.TestStatus) -> KunitStatus: 189 if test_status in (kunit_parser.TestStatus.SUCCESS, kunit_parser.TestStatus.SKIPPED): 190 return KunitStatus.SUCCESS 191 else: 192 return KunitStatus.TEST_FAILURE 193 194def parse_tests(request: KunitParseRequest, metadata: kunit_json.Metadata, input_data: Iterable[str]) -> Tuple[KunitResult, kunit_parser.Test]: 195 parse_start = time.time() 196 197 test_result = kunit_parser.Test() 198 199 if request.raw_output: 200 # Treat unparsed results as one passing test. 201 test_result.status = kunit_parser.TestStatus.SUCCESS 202 test_result.counts.passed = 1 203 204 output: Iterable[str] = input_data 205 if request.raw_output == 'all': 206 pass 207 elif request.raw_output == 'kunit': 208 output = kunit_parser.extract_tap_lines(output) 209 else: 210 print(f'Unknown --raw_output option "{request.raw_output}"', file=sys.stderr) 211 for line in output: 212 print(line.rstrip()) 213 214 else: 215 test_result = kunit_parser.parse_run_tests(input_data) 216 parse_end = time.time() 217 218 if request.json: 219 json_str = kunit_json.get_json_result( 220 test=test_result, 221 metadata=metadata) 222 if request.json == 'stdout': 223 print(json_str) 224 else: 225 with open(request.json, 'w') as f: 226 f.write(json_str) 227 kunit_parser.print_with_timestamp("Test results stored in %s" % 228 os.path.abspath(request.json)) 229 230 if test_result.status != kunit_parser.TestStatus.SUCCESS: 231 return KunitResult(KunitStatus.TEST_FAILURE, parse_end - parse_start), test_result 232 233 return KunitResult(KunitStatus.SUCCESS, parse_end - parse_start), test_result 234 235def run_tests(linux: kunit_kernel.LinuxSourceTree, 236 request: KunitRequest) -> KunitResult: 237 run_start = time.time() 238 239 config_result = config_tests(linux, request) 240 if config_result.status != KunitStatus.SUCCESS: 241 return config_result 242 243 build_result = build_tests(linux, request) 244 if build_result.status != KunitStatus.SUCCESS: 245 return build_result 246 247 exec_result = exec_tests(linux, request) 248 249 run_end = time.time() 250 251 kunit_parser.print_with_timestamp(( 252 'Elapsed time: %.3fs total, %.3fs configuring, %.3fs ' + 253 'building, %.3fs running\n') % ( 254 run_end - run_start, 255 config_result.elapsed_time, 256 build_result.elapsed_time, 257 exec_result.elapsed_time)) 258 return exec_result 259 260# Problem: 261# $ kunit.py run --json 262# works as one would expect and prints the parsed test results as JSON. 263# $ kunit.py run --json suite_name 264# would *not* pass suite_name as the filter_glob and print as json. 265# argparse will consider it to be another way of writing 266# $ kunit.py run --json=suite_name 267# i.e. it would run all tests, and dump the json to a `suite_name` file. 268# So we hackily automatically rewrite --json => --json=stdout 269pseudo_bool_flag_defaults = { 270 '--json': 'stdout', 271 '--raw_output': 'kunit', 272} 273def massage_argv(argv: Sequence[str]) -> Sequence[str]: 274 def massage_arg(arg: str) -> str: 275 if arg not in pseudo_bool_flag_defaults: 276 return arg 277 return f'{arg}={pseudo_bool_flag_defaults[arg]}' 278 return list(map(massage_arg, argv)) 279 280def get_default_jobs() -> int: 281 return len(os.sched_getaffinity(0)) 282 283def add_common_opts(parser) -> None: 284 parser.add_argument('--build_dir', 285 help='As in the make command, it specifies the build ' 286 'directory.', 287 type=str, default='.kunit', metavar='build_dir') 288 parser.add_argument('--make_options', 289 help='X=Y make option, can be repeated.', 290 action='append') 291 parser.add_argument('--alltests', 292 help='Run all KUnit tests through allyesconfig', 293 action='store_true') 294 parser.add_argument('--kunitconfig', 295 help='Path to Kconfig fragment that enables KUnit tests.' 296 ' If given a directory, (e.g. lib/kunit), "/.kunitconfig" ' 297 'will get automatically appended.', 298 metavar='kunitconfig') 299 parser.add_argument('--kconfig_add', 300 help='Additional Kconfig options to append to the ' 301 '.kunitconfig, e.g. CONFIG_KASAN=y. Can be repeated.', 302 action='append') 303 304 parser.add_argument('--arch', 305 help=('Specifies the architecture to run tests under. ' 306 'The architecture specified here must match the ' 307 'string passed to the ARCH make param, ' 308 'e.g. i386, x86_64, arm, um, etc. Non-UML ' 309 'architectures run on QEMU.'), 310 type=str, default='um', metavar='arch') 311 312 parser.add_argument('--cross_compile', 313 help=('Sets make\'s CROSS_COMPILE variable; it should ' 314 'be set to a toolchain path prefix (the prefix ' 315 'of gcc and other tools in your toolchain, for ' 316 'example `sparc64-linux-gnu-` if you have the ' 317 'sparc toolchain installed on your system, or ' 318 '`$HOME/toolchains/microblaze/gcc-9.2.0-nolibc/microblaze-linux/bin/microblaze-linux-` ' 319 'if you have downloaded the microblaze toolchain ' 320 'from the 0-day website to a directory in your ' 321 'home directory called `toolchains`).'), 322 metavar='cross_compile') 323 324 parser.add_argument('--qemu_config', 325 help=('Takes a path to a path to a file containing ' 326 'a QemuArchParams object.'), 327 type=str, metavar='qemu_config') 328 329def add_build_opts(parser) -> None: 330 parser.add_argument('--jobs', 331 help='As in the make command, "Specifies the number of ' 332 'jobs (commands) to run simultaneously."', 333 type=int, default=get_default_jobs(), metavar='jobs') 334 335def add_exec_opts(parser) -> None: 336 parser.add_argument('--timeout', 337 help='maximum number of seconds to allow for all tests ' 338 'to run. This does not include time taken to build the ' 339 'tests.', 340 type=int, 341 default=300, 342 metavar='timeout') 343 parser.add_argument('filter_glob', 344 help='Filter which KUnit test suites/tests run at ' 345 'boot-time, e.g. list* or list*.*del_test', 346 type=str, 347 nargs='?', 348 default='', 349 metavar='filter_glob') 350 parser.add_argument('--kernel_args', 351 help='Kernel command-line parameters. Maybe be repeated', 352 action='append') 353 parser.add_argument('--run_isolated', help='If set, boot the kernel for each ' 354 'individual suite/test. This is can be useful for debugging ' 355 'a non-hermetic test, one that might pass/fail based on ' 356 'what ran before it.', 357 type=str, 358 choices=['suite', 'test']), 359 360def add_parse_opts(parser) -> None: 361 parser.add_argument('--raw_output', help='If set don\'t format output from kernel. ' 362 'If set to --raw_output=kunit, filters to just KUnit output.', 363 type=str, nargs='?', const='all', default=None) 364 parser.add_argument('--json', 365 nargs='?', 366 help='Stores test results in a JSON, and either ' 367 'prints to stdout or saves to file if a ' 368 'filename is specified', 369 type=str, const='stdout', default=None) 370 371def main(argv, linux=None): 372 parser = argparse.ArgumentParser( 373 description='Helps writing and running KUnit tests.') 374 subparser = parser.add_subparsers(dest='subcommand') 375 376 # The 'run' command will config, build, exec, and parse in one go. 377 run_parser = subparser.add_parser('run', help='Runs KUnit tests.') 378 add_common_opts(run_parser) 379 add_build_opts(run_parser) 380 add_exec_opts(run_parser) 381 add_parse_opts(run_parser) 382 383 config_parser = subparser.add_parser('config', 384 help='Ensures that .config contains all of ' 385 'the options in .kunitconfig') 386 add_common_opts(config_parser) 387 388 build_parser = subparser.add_parser('build', help='Builds a kernel with KUnit tests') 389 add_common_opts(build_parser) 390 add_build_opts(build_parser) 391 392 exec_parser = subparser.add_parser('exec', help='Run a kernel with KUnit tests') 393 add_common_opts(exec_parser) 394 add_exec_opts(exec_parser) 395 add_parse_opts(exec_parser) 396 397 # The 'parse' option is special, as it doesn't need the kernel source 398 # (therefore there is no need for a build_dir, hence no add_common_opts) 399 # and the '--file' argument is not relevant to 'run', so isn't in 400 # add_parse_opts() 401 parse_parser = subparser.add_parser('parse', 402 help='Parses KUnit results from a file, ' 403 'and parses formatted results.') 404 add_parse_opts(parse_parser) 405 parse_parser.add_argument('file', 406 help='Specifies the file to read results from.', 407 type=str, nargs='?', metavar='input_file') 408 409 cli_args = parser.parse_args(massage_argv(argv)) 410 411 if get_kernel_root_path(): 412 os.chdir(get_kernel_root_path()) 413 414 if cli_args.subcommand == 'run': 415 if not os.path.exists(cli_args.build_dir): 416 os.mkdir(cli_args.build_dir) 417 418 if not linux: 419 linux = kunit_kernel.LinuxSourceTree(cli_args.build_dir, 420 kunitconfig_path=cli_args.kunitconfig, 421 kconfig_add=cli_args.kconfig_add, 422 arch=cli_args.arch, 423 cross_compile=cli_args.cross_compile, 424 qemu_config_path=cli_args.qemu_config) 425 426 request = KunitRequest(build_dir=cli_args.build_dir, 427 make_options=cli_args.make_options, 428 jobs=cli_args.jobs, 429 alltests=cli_args.alltests, 430 raw_output=cli_args.raw_output, 431 json=cli_args.json, 432 timeout=cli_args.timeout, 433 filter_glob=cli_args.filter_glob, 434 kernel_args=cli_args.kernel_args, 435 run_isolated=cli_args.run_isolated) 436 result = run_tests(linux, request) 437 if result.status != KunitStatus.SUCCESS: 438 sys.exit(1) 439 elif cli_args.subcommand == 'config': 440 if cli_args.build_dir and ( 441 not os.path.exists(cli_args.build_dir)): 442 os.mkdir(cli_args.build_dir) 443 444 if not linux: 445 linux = kunit_kernel.LinuxSourceTree(cli_args.build_dir, 446 kunitconfig_path=cli_args.kunitconfig, 447 kconfig_add=cli_args.kconfig_add, 448 arch=cli_args.arch, 449 cross_compile=cli_args.cross_compile, 450 qemu_config_path=cli_args.qemu_config) 451 452 request = KunitConfigRequest(build_dir=cli_args.build_dir, 453 make_options=cli_args.make_options) 454 result = config_tests(linux, request) 455 kunit_parser.print_with_timestamp(( 456 'Elapsed time: %.3fs\n') % ( 457 result.elapsed_time)) 458 if result.status != KunitStatus.SUCCESS: 459 sys.exit(1) 460 elif cli_args.subcommand == 'build': 461 if not linux: 462 linux = kunit_kernel.LinuxSourceTree(cli_args.build_dir, 463 kunitconfig_path=cli_args.kunitconfig, 464 kconfig_add=cli_args.kconfig_add, 465 arch=cli_args.arch, 466 cross_compile=cli_args.cross_compile, 467 qemu_config_path=cli_args.qemu_config) 468 469 request = KunitBuildRequest(build_dir=cli_args.build_dir, 470 make_options=cli_args.make_options, 471 jobs=cli_args.jobs, 472 alltests=cli_args.alltests) 473 result = config_and_build_tests(linux, request) 474 kunit_parser.print_with_timestamp(( 475 'Elapsed time: %.3fs\n') % ( 476 result.elapsed_time)) 477 if result.status != KunitStatus.SUCCESS: 478 sys.exit(1) 479 elif cli_args.subcommand == 'exec': 480 if not linux: 481 linux = kunit_kernel.LinuxSourceTree(cli_args.build_dir, 482 kunitconfig_path=cli_args.kunitconfig, 483 kconfig_add=cli_args.kconfig_add, 484 arch=cli_args.arch, 485 cross_compile=cli_args.cross_compile, 486 qemu_config_path=cli_args.qemu_config) 487 488 exec_request = KunitExecRequest(raw_output=cli_args.raw_output, 489 build_dir=cli_args.build_dir, 490 json=cli_args.json, 491 timeout=cli_args.timeout, 492 alltests=cli_args.alltests, 493 filter_glob=cli_args.filter_glob, 494 kernel_args=cli_args.kernel_args, 495 run_isolated=cli_args.run_isolated) 496 result = exec_tests(linux, exec_request) 497 kunit_parser.print_with_timestamp(( 498 'Elapsed time: %.3fs\n') % (result.elapsed_time)) 499 if result.status != KunitStatus.SUCCESS: 500 sys.exit(1) 501 elif cli_args.subcommand == 'parse': 502 if cli_args.file == None: 503 sys.stdin.reconfigure(errors='backslashreplace') # pytype: disable=attribute-error 504 kunit_output = sys.stdin 505 else: 506 with open(cli_args.file, 'r', errors='backslashreplace') as f: 507 kunit_output = f.read().splitlines() 508 # We know nothing about how the result was created! 509 metadata = kunit_json.Metadata() 510 request = KunitParseRequest(raw_output=cli_args.raw_output, 511 json=cli_args.json) 512 result, _ = parse_tests(request, metadata, kunit_output) 513 if result.status != KunitStatus.SUCCESS: 514 sys.exit(1) 515 else: 516 parser.print_help() 517 518if __name__ == '__main__': 519 main(sys.argv[1:]) 520