1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 3# 4# A thin wrapper on top of the KUnit Kernel 5# 6# Copyright (C) 2019, Google LLC. 7# Author: Felix Guo <felixguoxiuping@gmail.com> 8# Author: Brendan Higgins <brendanhiggins@google.com> 9 10import argparse 11import os 12import re 13import shlex 14import sys 15import time 16 17assert sys.version_info >= (3, 7), "Python version is too old" 18 19from dataclasses import dataclass 20from enum import Enum, auto 21from typing import Iterable, List, Optional, Sequence, Tuple 22 23import kunit_json 24import kunit_kernel 25import kunit_parser 26from kunit_printer import stdout, null_printer 27 28class KunitStatus(Enum): 29 SUCCESS = auto() 30 CONFIG_FAILURE = auto() 31 BUILD_FAILURE = auto() 32 TEST_FAILURE = auto() 33 34@dataclass 35class KunitResult: 36 status: KunitStatus 37 elapsed_time: float 38 39@dataclass 40class KunitConfigRequest: 41 build_dir: str 42 make_options: Optional[List[str]] 43 44@dataclass 45class KunitBuildRequest(KunitConfigRequest): 46 jobs: int 47 48@dataclass 49class KunitParseRequest: 50 raw_output: Optional[str] 51 json: Optional[str] 52 summary: bool 53 failed: bool 54 55@dataclass 56class KunitExecRequest(KunitParseRequest): 57 build_dir: str 58 timeout: int 59 filter_glob: str 60 filter: str 61 filter_action: Optional[str] 62 kernel_args: Optional[List[str]] 63 run_isolated: Optional[str] 64 list_tests: bool 65 list_tests_attr: bool 66 67@dataclass 68class KunitRequest(KunitExecRequest, KunitBuildRequest): 69 pass 70 71 72def get_kernel_root_path() -> str: 73 path = sys.argv[0] if not __file__ else __file__ 74 parts = os.path.realpath(path).split('tools/testing/kunit') 75 if len(parts) != 2: 76 sys.exit(1) 77 return parts[0] 78 79def config_tests(linux: kunit_kernel.LinuxSourceTree, 80 request: KunitConfigRequest) -> KunitResult: 81 stdout.print_with_timestamp('Configuring KUnit Kernel ...') 82 83 config_start = time.time() 84 success = linux.build_reconfig(request.build_dir, request.make_options) 85 config_end = time.time() 86 status = KunitStatus.SUCCESS if success else KunitStatus.CONFIG_FAILURE 87 return KunitResult(status, config_end - config_start) 88 89def build_tests(linux: kunit_kernel.LinuxSourceTree, 90 request: KunitBuildRequest) -> KunitResult: 91 stdout.print_with_timestamp('Building KUnit Kernel ...') 92 93 build_start = time.time() 94 success = linux.build_kernel(request.jobs, 95 request.build_dir, 96 request.make_options) 97 build_end = time.time() 98 status = KunitStatus.SUCCESS if success else KunitStatus.BUILD_FAILURE 99 return KunitResult(status, build_end - build_start) 100 101def config_and_build_tests(linux: kunit_kernel.LinuxSourceTree, 102 request: KunitBuildRequest) -> KunitResult: 103 config_result = config_tests(linux, request) 104 if config_result.status != KunitStatus.SUCCESS: 105 return config_result 106 107 return build_tests(linux, request) 108 109def _list_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> List[str]: 110 args = ['kunit.action=list'] 111 112 if request.kernel_args: 113 args.extend(request.kernel_args) 114 115 output = linux.run_kernel(args=args, 116 timeout=request.timeout, 117 filter_glob=request.filter_glob, 118 filter=request.filter, 119 filter_action=request.filter_action, 120 build_dir=request.build_dir) 121 lines = kunit_parser.extract_tap_lines(output) 122 # Hack! Drop the dummy TAP version header that the executor prints out. 123 lines.pop() 124 125 # Filter out any extraneous non-test output that might have gotten mixed in. 126 return [l for l in output if re.match(r'^[^\s.]+\.[^\s.]+$', l)] 127 128def _list_tests_attr(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> Iterable[str]: 129 args = ['kunit.action=list_attr'] 130 131 if request.kernel_args: 132 args.extend(request.kernel_args) 133 134 output = linux.run_kernel(args=args, 135 timeout=request.timeout, 136 filter_glob=request.filter_glob, 137 filter=request.filter, 138 filter_action=request.filter_action, 139 build_dir=request.build_dir) 140 lines = kunit_parser.extract_tap_lines(output) 141 # Hack! Drop the dummy TAP version header that the executor prints out. 142 lines.pop() 143 144 # Filter out any extraneous non-test output that might have gotten mixed in. 145 return lines 146 147def _suites_from_test_list(tests: List[str]) -> List[str]: 148 """Extracts all the suites from an ordered list of tests.""" 149 suites = [] # type: List[str] 150 for t in tests: 151 parts = t.split('.', maxsplit=2) 152 if len(parts) != 2: 153 raise ValueError(f'internal KUnit error, test name should be of the form "<suite>.<test>", got "{t}"') 154 suite, _ = parts 155 if not suites or suites[-1] != suite: 156 suites.append(suite) 157 return suites 158 159def exec_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> KunitResult: 160 filter_globs = [request.filter_glob] 161 if request.list_tests: 162 output = _list_tests(linux, request) 163 for line in output: 164 print(line.rstrip()) 165 return KunitResult(status=KunitStatus.SUCCESS, elapsed_time=0.0) 166 if request.list_tests_attr: 167 attr_output = _list_tests_attr(linux, request) 168 for line in attr_output: 169 print(line.rstrip()) 170 return KunitResult(status=KunitStatus.SUCCESS, elapsed_time=0.0) 171 if request.run_isolated: 172 tests = _list_tests(linux, request) 173 if request.run_isolated == 'test': 174 filter_globs = tests 175 elif request.run_isolated == 'suite': 176 filter_globs = _suites_from_test_list(tests) 177 # Apply the test-part of the user's glob, if present. 178 if '.' in request.filter_glob: 179 test_glob = request.filter_glob.split('.', maxsplit=2)[1] 180 filter_globs = [g + '.'+ test_glob for g in filter_globs] 181 182 metadata = kunit_json.Metadata(arch=linux.arch(), build_dir=request.build_dir, def_config='kunit_defconfig') 183 184 test_counts = kunit_parser.TestCounts() 185 exec_time = 0.0 186 for i, filter_glob in enumerate(filter_globs): 187 stdout.print_with_timestamp('Starting KUnit Kernel ({}/{})...'.format(i+1, len(filter_globs))) 188 189 test_start = time.time() 190 run_result = linux.run_kernel( 191 args=request.kernel_args, 192 timeout=request.timeout, 193 filter_glob=filter_glob, 194 filter=request.filter, 195 filter_action=request.filter_action, 196 build_dir=request.build_dir) 197 198 _, test_result = parse_tests(request, metadata, run_result) 199 # run_kernel() doesn't block on the kernel exiting. 200 # That only happens after we get the last line of output from `run_result`. 201 # So exec_time here actually contains parsing + execution time, which is fine. 202 test_end = time.time() 203 exec_time += test_end - test_start 204 205 test_counts.add_subtest_counts(test_result.counts) 206 207 if len(filter_globs) == 1 and test_counts.crashed > 0: 208 bd = request.build_dir 209 print('The kernel seems to have crashed; you can decode the stack traces with:') 210 print('$ scripts/decode_stacktrace.sh {}/vmlinux {} < {} | tee {}/decoded.log | {} parse'.format( 211 bd, bd, kunit_kernel.get_outfile_path(bd), bd, sys.argv[0])) 212 213 kunit_status = _map_to_overall_status(test_counts.get_status()) 214 return KunitResult(status=kunit_status, elapsed_time=exec_time) 215 216def _map_to_overall_status(test_status: kunit_parser.TestStatus) -> KunitStatus: 217 if test_status in (kunit_parser.TestStatus.SUCCESS, kunit_parser.TestStatus.SKIPPED): 218 return KunitStatus.SUCCESS 219 return KunitStatus.TEST_FAILURE 220 221def parse_tests(request: KunitParseRequest, metadata: kunit_json.Metadata, input_data: Iterable[str]) -> Tuple[KunitResult, kunit_parser.Test]: 222 parse_start = time.time() 223 224 if request.raw_output: 225 # Treat unparsed results as one passing test. 226 fake_test = kunit_parser.Test() 227 fake_test.status = kunit_parser.TestStatus.SUCCESS 228 fake_test.counts.passed = 1 229 230 output: Iterable[str] = input_data 231 if request.raw_output == 'all' or request.raw_output == 'full': 232 pass 233 elif request.raw_output == 'kunit': 234 output = kunit_parser.extract_tap_lines(output) 235 for line in output: 236 print(line.rstrip()) 237 parse_time = time.time() - parse_start 238 return KunitResult(KunitStatus.SUCCESS, parse_time), fake_test 239 240 default_printer = stdout 241 if request.summary or request.failed: 242 default_printer = null_printer 243 244 # Actually parse the test results. 245 test = kunit_parser.parse_run_tests(input_data, default_printer) 246 parse_time = time.time() - parse_start 247 248 if request.failed: 249 kunit_parser.print_test(test, request.failed, stdout) 250 kunit_parser.print_summary_line(test, stdout) 251 252 if request.json: 253 json_str = kunit_json.get_json_result( 254 test=test, 255 metadata=metadata) 256 if request.json == 'stdout': 257 print(json_str) 258 else: 259 with open(request.json, 'w') as f: 260 f.write(json_str) 261 stdout.print_with_timestamp("Test results stored in %s" % 262 os.path.abspath(request.json)) 263 264 if test.status != kunit_parser.TestStatus.SUCCESS: 265 return KunitResult(KunitStatus.TEST_FAILURE, parse_time), test 266 267 return KunitResult(KunitStatus.SUCCESS, parse_time), test 268 269def run_tests(linux: kunit_kernel.LinuxSourceTree, 270 request: KunitRequest) -> KunitResult: 271 run_start = time.time() 272 273 config_result = config_tests(linux, request) 274 if config_result.status != KunitStatus.SUCCESS: 275 return config_result 276 277 build_result = build_tests(linux, request) 278 if build_result.status != KunitStatus.SUCCESS: 279 return build_result 280 281 exec_result = exec_tests(linux, request) 282 283 run_end = time.time() 284 285 stdout.print_with_timestamp(( 286 'Elapsed time: %.3fs total, %.3fs configuring, %.3fs ' + 287 'building, %.3fs running\n') % ( 288 run_end - run_start, 289 config_result.elapsed_time, 290 build_result.elapsed_time, 291 exec_result.elapsed_time)) 292 return exec_result 293 294# Problem: 295# $ kunit.py run --json 296# works as one would expect and prints the parsed test results as JSON. 297# $ kunit.py run --json suite_name 298# would *not* pass suite_name as the filter_glob and print as json. 299# argparse will consider it to be another way of writing 300# $ kunit.py run --json=suite_name 301# i.e. it would run all tests, and dump the json to a `suite_name` file. 302# So we hackily automatically rewrite --json => --json=stdout 303pseudo_bool_flag_defaults = { 304 '--json': 'stdout', 305 '--raw_output': 'kunit', 306} 307def massage_argv(argv: Sequence[str]) -> Sequence[str]: 308 def massage_arg(arg: str) -> str: 309 if arg not in pseudo_bool_flag_defaults: 310 return arg 311 return f'{arg}={pseudo_bool_flag_defaults[arg]}' 312 return list(map(massage_arg, argv)) 313 314def get_default_jobs() -> int: 315 if sys.version_info >= (3, 13): 316 if (ncpu := os.process_cpu_count()) is not None: 317 return ncpu 318 raise RuntimeError("os.process_cpu_count() returned None") 319 # See https://github.com/python/cpython/blob/b61fece/Lib/os.py#L1175-L1186. 320 if sys.platform != "darwin": 321 return len(os.sched_getaffinity(0)) 322 if (ncpu := os.cpu_count()) is not None: 323 return ncpu 324 raise RuntimeError("os.cpu_count() returned None") 325 326def get_default_build_dir() -> str: 327 if 'KBUILD_OUTPUT' in os.environ: 328 return os.path.join(os.environ['KBUILD_OUTPUT'], '.kunit') 329 return '.kunit' 330 331def add_common_opts(parser: argparse.ArgumentParser) -> None: 332 parser.add_argument('--build_dir', 333 help='As in the make command, it specifies the build ' 334 'directory.', 335 type=str, default=get_default_build_dir(), metavar='DIR') 336 parser.add_argument('--make_options', 337 help='X=Y make option, can be repeated.', 338 action='append', metavar='X=Y') 339 parser.add_argument('--alltests', 340 help='Run all KUnit tests via tools/testing/kunit/configs/all_tests.config', 341 action='store_true') 342 parser.add_argument('--kunitconfig', 343 help='Path to Kconfig fragment that enables KUnit tests.' 344 ' If given a directory, (e.g. lib/kunit), "/.kunitconfig" ' 345 'will get automatically appended. If repeated, the files ' 346 'blindly concatenated, which might not work in all cases.', 347 action='append', metavar='PATHS') 348 parser.add_argument('--kconfig_add', 349 help='Additional Kconfig options to append to the ' 350 '.kunitconfig, e.g. CONFIG_KASAN=y. Can be repeated.', 351 action='append', metavar='CONFIG_X=Y') 352 353 parser.add_argument('--arch', 354 help=('Specifies the architecture to run tests under. ' 355 'The architecture specified here must match the ' 356 'string passed to the ARCH make param, ' 357 'e.g. i386, x86_64, arm, um, etc. Non-UML ' 358 'architectures run on QEMU.'), 359 type=str, default='um', metavar='ARCH') 360 361 parser.add_argument('--cross_compile', 362 help=('Sets make\'s CROSS_COMPILE variable; it should ' 363 'be set to a toolchain path prefix (the prefix ' 364 'of gcc and other tools in your toolchain, for ' 365 'example `sparc64-linux-gnu-` if you have the ' 366 'sparc toolchain installed on your system, or ' 367 '`$HOME/toolchains/microblaze/gcc-9.2.0-nolibc/microblaze-linux/bin/microblaze-linux-` ' 368 'if you have downloaded the microblaze toolchain ' 369 'from the 0-day website to a directory in your ' 370 'home directory called `toolchains`).'), 371 metavar='PREFIX') 372 373 parser.add_argument('--qemu_config', 374 help=('Takes a path to a path to a file containing ' 375 'a QemuArchParams object.'), 376 type=str, metavar='FILE') 377 378 parser.add_argument('--qemu_args', 379 help='Additional QEMU arguments, e.g. "-smp 8"', 380 action='append', metavar='') 381 382def add_build_opts(parser: argparse.ArgumentParser) -> None: 383 parser.add_argument('--jobs', 384 help='As in the make command, "Specifies the number of ' 385 'jobs (commands) to run simultaneously."', 386 type=int, default=get_default_jobs(), metavar='N') 387 388def add_exec_opts(parser: argparse.ArgumentParser) -> None: 389 parser.add_argument('--timeout', 390 help='maximum number of seconds to allow for all tests ' 391 'to run. This does not include time taken to build the ' 392 'tests.', 393 type=int, 394 default=300, 395 metavar='SECONDS') 396 parser.add_argument('filter_glob', 397 help='Filter which KUnit test suites/tests run at ' 398 'boot-time, e.g. list* or list*.*del_test', 399 type=str, 400 nargs='?', 401 default='', 402 metavar='filter_glob') 403 parser.add_argument('--filter', 404 help='Filter KUnit tests with attributes, ' 405 'e.g. module=example or speed>slow', 406 type=str, 407 default='') 408 parser.add_argument('--filter_action', 409 help='If set to skip, filtered tests will be skipped, ' 410 'e.g. --filter_action=skip. Otherwise they will not run.', 411 type=str, 412 choices=['skip']) 413 parser.add_argument('--kernel_args', 414 help='Kernel command-line parameters. Maybe be repeated', 415 action='append', metavar='') 416 parser.add_argument('--run_isolated', help='If set, boot the kernel for each ' 417 'individual suite/test. This is can be useful for debugging ' 418 'a non-hermetic test, one that might pass/fail based on ' 419 'what ran before it.', 420 type=str, 421 choices=['suite', 'test']) 422 parser.add_argument('--list_tests', help='If set, list all tests that will be ' 423 'run.', 424 action='store_true') 425 parser.add_argument('--list_tests_attr', help='If set, list all tests and test ' 426 'attributes.', 427 action='store_true') 428 429def add_parse_opts(parser: argparse.ArgumentParser) -> None: 430 parser.add_argument('--raw_output', help='If set don\'t parse output from kernel. ' 431 'By default, filters to just KUnit output. Use ' 432 '--raw_output=all to show everything', 433 type=str, nargs='?', const='all', default=None, choices=['all', 'full', 'kunit']) 434 parser.add_argument('--json', 435 nargs='?', 436 help='Prints parsed test results as JSON to stdout or a file if ' 437 'a filename is specified. Does nothing if --raw_output is set.', 438 type=str, const='stdout', default=None, metavar='FILE') 439 parser.add_argument('--summary', 440 help='Prints only the summary line for parsed test results.' 441 'Does nothing if --raw_output is set.', 442 action='store_true') 443 parser.add_argument('--failed', 444 help='Prints only the failed parsed test results and summary line.' 445 'Does nothing if --raw_output is set.', 446 action='store_true') 447 448 449def tree_from_args(cli_args: argparse.Namespace) -> kunit_kernel.LinuxSourceTree: 450 """Returns a LinuxSourceTree based on the user's arguments.""" 451 # Allow users to specify multiple arguments in one string, e.g. '-smp 8' 452 qemu_args: List[str] = [] 453 if cli_args.qemu_args: 454 for arg in cli_args.qemu_args: 455 qemu_args.extend(shlex.split(arg)) 456 457 kunitconfigs = cli_args.kunitconfig if cli_args.kunitconfig else [] 458 if cli_args.alltests: 459 # Prepend so user-specified options take prio if we ever allow 460 # --kunitconfig options to have differing options. 461 kunitconfigs = [kunit_kernel.ALL_TESTS_CONFIG_PATH] + kunitconfigs 462 463 return kunit_kernel.LinuxSourceTree(cli_args.build_dir, 464 kunitconfig_paths=kunitconfigs, 465 kconfig_add=cli_args.kconfig_add, 466 arch=cli_args.arch, 467 cross_compile=cli_args.cross_compile, 468 qemu_config_path=cli_args.qemu_config, 469 extra_qemu_args=qemu_args) 470 471 472def run_handler(cli_args: argparse.Namespace) -> None: 473 if not os.path.exists(cli_args.build_dir): 474 os.mkdir(cli_args.build_dir) 475 476 linux = tree_from_args(cli_args) 477 request = KunitRequest(build_dir=cli_args.build_dir, 478 make_options=cli_args.make_options, 479 jobs=cli_args.jobs, 480 raw_output=cli_args.raw_output, 481 json=cli_args.json, 482 summary=cli_args.summary, 483 failed=cli_args.failed, 484 timeout=cli_args.timeout, 485 filter_glob=cli_args.filter_glob, 486 filter=cli_args.filter, 487 filter_action=cli_args.filter_action, 488 kernel_args=cli_args.kernel_args, 489 run_isolated=cli_args.run_isolated, 490 list_tests=cli_args.list_tests, 491 list_tests_attr=cli_args.list_tests_attr) 492 result = run_tests(linux, request) 493 if result.status != KunitStatus.SUCCESS: 494 sys.exit(1) 495 496 497def config_handler(cli_args: argparse.Namespace) -> None: 498 if cli_args.build_dir and ( 499 not os.path.exists(cli_args.build_dir)): 500 os.mkdir(cli_args.build_dir) 501 502 linux = tree_from_args(cli_args) 503 request = KunitConfigRequest(build_dir=cli_args.build_dir, 504 make_options=cli_args.make_options) 505 result = config_tests(linux, request) 506 stdout.print_with_timestamp(( 507 'Elapsed time: %.3fs\n') % ( 508 result.elapsed_time)) 509 if result.status != KunitStatus.SUCCESS: 510 sys.exit(1) 511 512 513def build_handler(cli_args: argparse.Namespace) -> None: 514 linux = tree_from_args(cli_args) 515 request = KunitBuildRequest(build_dir=cli_args.build_dir, 516 make_options=cli_args.make_options, 517 jobs=cli_args.jobs) 518 result = config_and_build_tests(linux, request) 519 stdout.print_with_timestamp(( 520 'Elapsed time: %.3fs\n') % ( 521 result.elapsed_time)) 522 if result.status != KunitStatus.SUCCESS: 523 sys.exit(1) 524 525 526def exec_handler(cli_args: argparse.Namespace) -> None: 527 linux = tree_from_args(cli_args) 528 exec_request = KunitExecRequest(raw_output=cli_args.raw_output, 529 build_dir=cli_args.build_dir, 530 json=cli_args.json, 531 summary=cli_args.summary, 532 failed=cli_args.failed, 533 timeout=cli_args.timeout, 534 filter_glob=cli_args.filter_glob, 535 filter=cli_args.filter, 536 filter_action=cli_args.filter_action, 537 kernel_args=cli_args.kernel_args, 538 run_isolated=cli_args.run_isolated, 539 list_tests=cli_args.list_tests, 540 list_tests_attr=cli_args.list_tests_attr) 541 result = exec_tests(linux, exec_request) 542 stdout.print_with_timestamp(( 543 'Elapsed time: %.3fs\n') % (result.elapsed_time)) 544 if result.status != KunitStatus.SUCCESS: 545 sys.exit(1) 546 547 548def parse_handler(cli_args: argparse.Namespace) -> None: 549 if cli_args.file is None: 550 sys.stdin.reconfigure(errors='backslashreplace') # type: ignore 551 kunit_output = sys.stdin # type: Iterable[str] 552 else: 553 with open(cli_args.file, 'r', errors='backslashreplace') as f: 554 kunit_output = f.read().splitlines() 555 # We know nothing about how the result was created! 556 metadata = kunit_json.Metadata() 557 request = KunitParseRequest(raw_output=cli_args.raw_output, 558 json=cli_args.json, summary=cli_args.summary, 559 failed=cli_args.failed) 560 result, _ = parse_tests(request, metadata, kunit_output) 561 if result.status != KunitStatus.SUCCESS: 562 sys.exit(1) 563 564 565subcommand_handlers_map = { 566 'run': run_handler, 567 'config': config_handler, 568 'build': build_handler, 569 'exec': exec_handler, 570 'parse': parse_handler 571} 572 573 574def main(argv: Sequence[str]) -> None: 575 parser = argparse.ArgumentParser( 576 description='Helps writing and running KUnit tests.') 577 subparser = parser.add_subparsers(dest='subcommand') 578 579 # The 'run' command will config, build, exec, and parse in one go. 580 run_parser = subparser.add_parser('run', help='Runs KUnit tests.') 581 add_common_opts(run_parser) 582 add_build_opts(run_parser) 583 add_exec_opts(run_parser) 584 add_parse_opts(run_parser) 585 586 config_parser = subparser.add_parser('config', 587 help='Ensures that .config contains all of ' 588 'the options in .kunitconfig') 589 add_common_opts(config_parser) 590 591 build_parser = subparser.add_parser('build', help='Builds a kernel with KUnit tests') 592 add_common_opts(build_parser) 593 add_build_opts(build_parser) 594 595 exec_parser = subparser.add_parser('exec', help='Run a kernel with KUnit tests') 596 add_common_opts(exec_parser) 597 add_exec_opts(exec_parser) 598 add_parse_opts(exec_parser) 599 600 # The 'parse' option is special, as it doesn't need the kernel source 601 # (therefore there is no need for a build_dir, hence no add_common_opts) 602 # and the '--file' argument is not relevant to 'run', so isn't in 603 # add_parse_opts() 604 parse_parser = subparser.add_parser('parse', 605 help='Parses KUnit results from a file, ' 606 'and parses formatted results.') 607 add_parse_opts(parse_parser) 608 parse_parser.add_argument('file', 609 help='Specifies the file to read results from.', 610 type=str, nargs='?', metavar='input_file') 611 612 cli_args = parser.parse_args(massage_argv(argv)) 613 614 if get_kernel_root_path(): 615 os.chdir(get_kernel_root_path()) 616 617 subcomand_handler = subcommand_handlers_map.get(cli_args.subcommand, None) 618 619 if subcomand_handler is None: 620 parser.print_help() 621 return 622 623 subcomand_handler(cli_args) 624 625 626if __name__ == '__main__': 627 main(sys.argv[1:]) 628