xref: /linux/tools/testing/kunit/kunit.py (revision 7f4f3b14e8079ecde096bd734af10e30d40c27b7)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3#
4# A thin wrapper on top of the KUnit Kernel
5#
6# Copyright (C) 2019, Google LLC.
7# Author: Felix Guo <felixguoxiuping@gmail.com>
8# Author: Brendan Higgins <brendanhiggins@google.com>
9
10import argparse
11import os
12import re
13import shlex
14import sys
15import time
16
17assert sys.version_info >= (3, 7), "Python version is too old"
18
19from dataclasses import dataclass
20from enum import Enum, auto
21from typing import Iterable, List, Optional, Sequence, Tuple
22
23import kunit_json
24import kunit_kernel
25import kunit_parser
26from kunit_printer import stdout, null_printer
27
28class KunitStatus(Enum):
29	SUCCESS = auto()
30	CONFIG_FAILURE = auto()
31	BUILD_FAILURE = auto()
32	TEST_FAILURE = auto()
33
34@dataclass
35class KunitResult:
36	status: KunitStatus
37	elapsed_time: float
38
39@dataclass
40class KunitConfigRequest:
41	build_dir: str
42	make_options: Optional[List[str]]
43
44@dataclass
45class KunitBuildRequest(KunitConfigRequest):
46	jobs: int
47
48@dataclass
49class KunitParseRequest:
50	raw_output: Optional[str]
51	json: Optional[str]
52	summary: bool
53	failed: bool
54
55@dataclass
56class KunitExecRequest(KunitParseRequest):
57	build_dir: str
58	timeout: int
59	filter_glob: str
60	filter: str
61	filter_action: Optional[str]
62	kernel_args: Optional[List[str]]
63	run_isolated: Optional[str]
64	list_tests: bool
65	list_tests_attr: bool
66
67@dataclass
68class KunitRequest(KunitExecRequest, KunitBuildRequest):
69	pass
70
71
72def get_kernel_root_path() -> str:
73	path = sys.argv[0] if not __file__ else __file__
74	parts = os.path.realpath(path).split('tools/testing/kunit')
75	if len(parts) != 2:
76		sys.exit(1)
77	return parts[0]
78
79def config_tests(linux: kunit_kernel.LinuxSourceTree,
80		 request: KunitConfigRequest) -> KunitResult:
81	stdout.print_with_timestamp('Configuring KUnit Kernel ...')
82
83	config_start = time.time()
84	success = linux.build_reconfig(request.build_dir, request.make_options)
85	config_end = time.time()
86	status = KunitStatus.SUCCESS if success else KunitStatus.CONFIG_FAILURE
87	return KunitResult(status, config_end - config_start)
88
89def build_tests(linux: kunit_kernel.LinuxSourceTree,
90		request: KunitBuildRequest) -> KunitResult:
91	stdout.print_with_timestamp('Building KUnit Kernel ...')
92
93	build_start = time.time()
94	success = linux.build_kernel(request.jobs,
95				     request.build_dir,
96				     request.make_options)
97	build_end = time.time()
98	status = KunitStatus.SUCCESS if success else KunitStatus.BUILD_FAILURE
99	return KunitResult(status, build_end - build_start)
100
101def config_and_build_tests(linux: kunit_kernel.LinuxSourceTree,
102			   request: KunitBuildRequest) -> KunitResult:
103	config_result = config_tests(linux, request)
104	if config_result.status != KunitStatus.SUCCESS:
105		return config_result
106
107	return build_tests(linux, request)
108
109def _list_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> List[str]:
110	args = ['kunit.action=list']
111
112	if request.kernel_args:
113		args.extend(request.kernel_args)
114
115	output = linux.run_kernel(args=args,
116			   timeout=request.timeout,
117			   filter_glob=request.filter_glob,
118			   filter=request.filter,
119			   filter_action=request.filter_action,
120			   build_dir=request.build_dir)
121	lines = kunit_parser.extract_tap_lines(output)
122	# Hack! Drop the dummy TAP version header that the executor prints out.
123	lines.pop()
124
125	# Filter out any extraneous non-test output that might have gotten mixed in.
126	return [l for l in output if re.match(r'^[^\s.]+\.[^\s.]+$', l)]
127
128def _list_tests_attr(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> Iterable[str]:
129	args = ['kunit.action=list_attr']
130
131	if request.kernel_args:
132		args.extend(request.kernel_args)
133
134	output = linux.run_kernel(args=args,
135			   timeout=request.timeout,
136			   filter_glob=request.filter_glob,
137			   filter=request.filter,
138			   filter_action=request.filter_action,
139			   build_dir=request.build_dir)
140	lines = kunit_parser.extract_tap_lines(output)
141	# Hack! Drop the dummy TAP version header that the executor prints out.
142	lines.pop()
143
144	# Filter out any extraneous non-test output that might have gotten mixed in.
145	return lines
146
147def _suites_from_test_list(tests: List[str]) -> List[str]:
148	"""Extracts all the suites from an ordered list of tests."""
149	suites = []  # type: List[str]
150	for t in tests:
151		parts = t.split('.', maxsplit=2)
152		if len(parts) != 2:
153			raise ValueError(f'internal KUnit error, test name should be of the form "<suite>.<test>", got "{t}"')
154		suite, _ = parts
155		if not suites or suites[-1] != suite:
156			suites.append(suite)
157	return suites
158
159def exec_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> KunitResult:
160	filter_globs = [request.filter_glob]
161	if request.list_tests:
162		output = _list_tests(linux, request)
163		for line in output:
164			print(line.rstrip())
165		return KunitResult(status=KunitStatus.SUCCESS, elapsed_time=0.0)
166	if request.list_tests_attr:
167		attr_output = _list_tests_attr(linux, request)
168		for line in attr_output:
169			print(line.rstrip())
170		return KunitResult(status=KunitStatus.SUCCESS, elapsed_time=0.0)
171	if request.run_isolated:
172		tests = _list_tests(linux, request)
173		if request.run_isolated == 'test':
174			filter_globs = tests
175		elif request.run_isolated == 'suite':
176			filter_globs = _suites_from_test_list(tests)
177			# Apply the test-part of the user's glob, if present.
178			if '.' in request.filter_glob:
179				test_glob = request.filter_glob.split('.', maxsplit=2)[1]
180				filter_globs = [g + '.'+ test_glob for g in filter_globs]
181
182	metadata = kunit_json.Metadata(arch=linux.arch(), build_dir=request.build_dir, def_config='kunit_defconfig')
183
184	test_counts = kunit_parser.TestCounts()
185	exec_time = 0.0
186	for i, filter_glob in enumerate(filter_globs):
187		stdout.print_with_timestamp('Starting KUnit Kernel ({}/{})...'.format(i+1, len(filter_globs)))
188
189		test_start = time.time()
190		run_result = linux.run_kernel(
191			args=request.kernel_args,
192			timeout=request.timeout,
193			filter_glob=filter_glob,
194			filter=request.filter,
195			filter_action=request.filter_action,
196			build_dir=request.build_dir)
197
198		_, test_result = parse_tests(request, metadata, run_result)
199		# run_kernel() doesn't block on the kernel exiting.
200		# That only happens after we get the last line of output from `run_result`.
201		# So exec_time here actually contains parsing + execution time, which is fine.
202		test_end = time.time()
203		exec_time += test_end - test_start
204
205		test_counts.add_subtest_counts(test_result.counts)
206
207	if len(filter_globs) == 1 and test_counts.crashed > 0:
208		bd = request.build_dir
209		print('The kernel seems to have crashed; you can decode the stack traces with:')
210		print('$ scripts/decode_stacktrace.sh {}/vmlinux {} < {} | tee {}/decoded.log | {} parse'.format(
211				bd, bd, kunit_kernel.get_outfile_path(bd), bd, sys.argv[0]))
212
213	kunit_status = _map_to_overall_status(test_counts.get_status())
214	return KunitResult(status=kunit_status, elapsed_time=exec_time)
215
216def _map_to_overall_status(test_status: kunit_parser.TestStatus) -> KunitStatus:
217	if test_status in (kunit_parser.TestStatus.SUCCESS, kunit_parser.TestStatus.SKIPPED):
218		return KunitStatus.SUCCESS
219	return KunitStatus.TEST_FAILURE
220
221def parse_tests(request: KunitParseRequest, metadata: kunit_json.Metadata, input_data: Iterable[str]) -> Tuple[KunitResult, kunit_parser.Test]:
222	parse_start = time.time()
223
224	if request.raw_output:
225		# Treat unparsed results as one passing test.
226		fake_test = kunit_parser.Test()
227		fake_test.status = kunit_parser.TestStatus.SUCCESS
228		fake_test.counts.passed = 1
229
230		output: Iterable[str] = input_data
231		if request.raw_output == 'all':
232			pass
233		elif request.raw_output == 'kunit':
234			output = kunit_parser.extract_tap_lines(output)
235		for line in output:
236			print(line.rstrip())
237		parse_time = time.time() - parse_start
238		return KunitResult(KunitStatus.SUCCESS, parse_time), fake_test
239
240	default_printer = stdout
241	if request.summary or request.failed:
242		default_printer = null_printer
243
244	# Actually parse the test results.
245	test = kunit_parser.parse_run_tests(input_data, default_printer)
246	parse_time = time.time() - parse_start
247
248	if request.failed:
249		kunit_parser.print_test(test, request.failed, stdout)
250	kunit_parser.print_summary_line(test, stdout)
251
252	if request.json:
253		json_str = kunit_json.get_json_result(
254					test=test,
255					metadata=metadata)
256		if request.json == 'stdout':
257			print(json_str)
258		else:
259			with open(request.json, 'w') as f:
260				f.write(json_str)
261			stdout.print_with_timestamp("Test results stored in %s" %
262				os.path.abspath(request.json))
263
264	if test.status != kunit_parser.TestStatus.SUCCESS:
265		return KunitResult(KunitStatus.TEST_FAILURE, parse_time), test
266
267	return KunitResult(KunitStatus.SUCCESS, parse_time), test
268
269def run_tests(linux: kunit_kernel.LinuxSourceTree,
270	      request: KunitRequest) -> KunitResult:
271	run_start = time.time()
272
273	config_result = config_tests(linux, request)
274	if config_result.status != KunitStatus.SUCCESS:
275		return config_result
276
277	build_result = build_tests(linux, request)
278	if build_result.status != KunitStatus.SUCCESS:
279		return build_result
280
281	exec_result = exec_tests(linux, request)
282
283	run_end = time.time()
284
285	stdout.print_with_timestamp((
286		'Elapsed time: %.3fs total, %.3fs configuring, %.3fs ' +
287		'building, %.3fs running\n') % (
288				run_end - run_start,
289				config_result.elapsed_time,
290				build_result.elapsed_time,
291				exec_result.elapsed_time))
292	return exec_result
293
294# Problem:
295# $ kunit.py run --json
296# works as one would expect and prints the parsed test results as JSON.
297# $ kunit.py run --json suite_name
298# would *not* pass suite_name as the filter_glob and print as json.
299# argparse will consider it to be another way of writing
300# $ kunit.py run --json=suite_name
301# i.e. it would run all tests, and dump the json to a `suite_name` file.
302# So we hackily automatically rewrite --json => --json=stdout
303pseudo_bool_flag_defaults = {
304		'--json': 'stdout',
305		'--raw_output': 'kunit',
306}
307def massage_argv(argv: Sequence[str]) -> Sequence[str]:
308	def massage_arg(arg: str) -> str:
309		if arg not in pseudo_bool_flag_defaults:
310			return arg
311		return  f'{arg}={pseudo_bool_flag_defaults[arg]}'
312	return list(map(massage_arg, argv))
313
314def get_default_jobs() -> int:
315	return len(os.sched_getaffinity(0))
316
317def add_common_opts(parser: argparse.ArgumentParser) -> None:
318	parser.add_argument('--build_dir',
319			    help='As in the make command, it specifies the build '
320			    'directory.',
321			    type=str, default='.kunit', metavar='DIR')
322	parser.add_argument('--make_options',
323			    help='X=Y make option, can be repeated.',
324			    action='append', metavar='X=Y')
325	parser.add_argument('--alltests',
326			    help='Run all KUnit tests via tools/testing/kunit/configs/all_tests.config',
327			    action='store_true')
328	parser.add_argument('--kunitconfig',
329			     help='Path to Kconfig fragment that enables KUnit tests.'
330			     ' If given a directory, (e.g. lib/kunit), "/.kunitconfig" '
331			     'will get  automatically appended. If repeated, the files '
332			     'blindly concatenated, which might not work in all cases.',
333			     action='append', metavar='PATHS')
334	parser.add_argument('--kconfig_add',
335			     help='Additional Kconfig options to append to the '
336			     '.kunitconfig, e.g. CONFIG_KASAN=y. Can be repeated.',
337			    action='append', metavar='CONFIG_X=Y')
338
339	parser.add_argument('--arch',
340			    help=('Specifies the architecture to run tests under. '
341				  'The architecture specified here must match the '
342				  'string passed to the ARCH make param, '
343				  'e.g. i386, x86_64, arm, um, etc. Non-UML '
344				  'architectures run on QEMU.'),
345			    type=str, default='um', metavar='ARCH')
346
347	parser.add_argument('--cross_compile',
348			    help=('Sets make\'s CROSS_COMPILE variable; it should '
349				  'be set to a toolchain path prefix (the prefix '
350				  'of gcc and other tools in your toolchain, for '
351				  'example `sparc64-linux-gnu-` if you have the '
352				  'sparc toolchain installed on your system, or '
353				  '`$HOME/toolchains/microblaze/gcc-9.2.0-nolibc/microblaze-linux/bin/microblaze-linux-` '
354				  'if you have downloaded the microblaze toolchain '
355				  'from the 0-day website to a directory in your '
356				  'home directory called `toolchains`).'),
357			    metavar='PREFIX')
358
359	parser.add_argument('--qemu_config',
360			    help=('Takes a path to a path to a file containing '
361				  'a QemuArchParams object.'),
362			    type=str, metavar='FILE')
363
364	parser.add_argument('--qemu_args',
365			    help='Additional QEMU arguments, e.g. "-smp 8"',
366			    action='append', metavar='')
367
368def add_build_opts(parser: argparse.ArgumentParser) -> None:
369	parser.add_argument('--jobs',
370			    help='As in the make command, "Specifies  the number of '
371			    'jobs (commands) to run simultaneously."',
372			    type=int, default=get_default_jobs(), metavar='N')
373
374def add_exec_opts(parser: argparse.ArgumentParser) -> None:
375	parser.add_argument('--timeout',
376			    help='maximum number of seconds to allow for all tests '
377			    'to run. This does not include time taken to build the '
378			    'tests.',
379			    type=int,
380			    default=300,
381			    metavar='SECONDS')
382	parser.add_argument('filter_glob',
383			    help='Filter which KUnit test suites/tests run at '
384			    'boot-time, e.g. list* or list*.*del_test',
385			    type=str,
386			    nargs='?',
387			    default='',
388			    metavar='filter_glob')
389	parser.add_argument('--filter',
390			    help='Filter KUnit tests with attributes, '
391			    'e.g. module=example or speed>slow',
392			    type=str,
393				default='')
394	parser.add_argument('--filter_action',
395			    help='If set to skip, filtered tests will be skipped, '
396				'e.g. --filter_action=skip. Otherwise they will not run.',
397			    type=str,
398				choices=['skip'])
399	parser.add_argument('--kernel_args',
400			    help='Kernel command-line parameters. Maybe be repeated',
401			     action='append', metavar='')
402	parser.add_argument('--run_isolated', help='If set, boot the kernel for each '
403			    'individual suite/test. This is can be useful for debugging '
404			    'a non-hermetic test, one that might pass/fail based on '
405			    'what ran before it.',
406			    type=str,
407			    choices=['suite', 'test'])
408	parser.add_argument('--list_tests', help='If set, list all tests that will be '
409			    'run.',
410			    action='store_true')
411	parser.add_argument('--list_tests_attr', help='If set, list all tests and test '
412			    'attributes.',
413			    action='store_true')
414
415def add_parse_opts(parser: argparse.ArgumentParser) -> None:
416	parser.add_argument('--raw_output', help='If set don\'t parse output from kernel. '
417			    'By default, filters to just KUnit output. Use '
418			    '--raw_output=all to show everything',
419			     type=str, nargs='?', const='all', default=None, choices=['all', 'kunit'])
420	parser.add_argument('--json',
421			    nargs='?',
422			    help='Prints parsed test results as JSON to stdout or a file if '
423			    'a filename is specified. Does nothing if --raw_output is set.',
424			    type=str, const='stdout', default=None, metavar='FILE')
425	parser.add_argument('--summary',
426			    help='Prints only the summary line for parsed test results.'
427				'Does nothing if --raw_output is set.',
428			    action='store_true')
429	parser.add_argument('--failed',
430			    help='Prints only the failed parsed test results and summary line.'
431				'Does nothing if --raw_output is set.',
432			    action='store_true')
433
434
435def tree_from_args(cli_args: argparse.Namespace) -> kunit_kernel.LinuxSourceTree:
436	"""Returns a LinuxSourceTree based on the user's arguments."""
437	# Allow users to specify multiple arguments in one string, e.g. '-smp 8'
438	qemu_args: List[str] = []
439	if cli_args.qemu_args:
440		for arg in cli_args.qemu_args:
441			qemu_args.extend(shlex.split(arg))
442
443	kunitconfigs = cli_args.kunitconfig if cli_args.kunitconfig else []
444	if cli_args.alltests:
445		# Prepend so user-specified options take prio if we ever allow
446		# --kunitconfig options to have differing options.
447		kunitconfigs = [kunit_kernel.ALL_TESTS_CONFIG_PATH] + kunitconfigs
448
449	return kunit_kernel.LinuxSourceTree(cli_args.build_dir,
450			kunitconfig_paths=kunitconfigs,
451			kconfig_add=cli_args.kconfig_add,
452			arch=cli_args.arch,
453			cross_compile=cli_args.cross_compile,
454			qemu_config_path=cli_args.qemu_config,
455			extra_qemu_args=qemu_args)
456
457
458def run_handler(cli_args: argparse.Namespace) -> None:
459	if not os.path.exists(cli_args.build_dir):
460		os.mkdir(cli_args.build_dir)
461
462	linux = tree_from_args(cli_args)
463	request = KunitRequest(build_dir=cli_args.build_dir,
464					make_options=cli_args.make_options,
465					jobs=cli_args.jobs,
466					raw_output=cli_args.raw_output,
467					json=cli_args.json,
468					summary=cli_args.summary,
469					failed=cli_args.failed,
470					timeout=cli_args.timeout,
471					filter_glob=cli_args.filter_glob,
472					filter=cli_args.filter,
473					filter_action=cli_args.filter_action,
474					kernel_args=cli_args.kernel_args,
475					run_isolated=cli_args.run_isolated,
476					list_tests=cli_args.list_tests,
477					list_tests_attr=cli_args.list_tests_attr)
478	result = run_tests(linux, request)
479	if result.status != KunitStatus.SUCCESS:
480		sys.exit(1)
481
482
483def config_handler(cli_args: argparse.Namespace) -> None:
484	if cli_args.build_dir and (
485			not os.path.exists(cli_args.build_dir)):
486		os.mkdir(cli_args.build_dir)
487
488	linux = tree_from_args(cli_args)
489	request = KunitConfigRequest(build_dir=cli_args.build_dir,
490						make_options=cli_args.make_options)
491	result = config_tests(linux, request)
492	stdout.print_with_timestamp((
493		'Elapsed time: %.3fs\n') % (
494			result.elapsed_time))
495	if result.status != KunitStatus.SUCCESS:
496		sys.exit(1)
497
498
499def build_handler(cli_args: argparse.Namespace) -> None:
500	linux = tree_from_args(cli_args)
501	request = KunitBuildRequest(build_dir=cli_args.build_dir,
502					make_options=cli_args.make_options,
503					jobs=cli_args.jobs)
504	result = config_and_build_tests(linux, request)
505	stdout.print_with_timestamp((
506		'Elapsed time: %.3fs\n') % (
507			result.elapsed_time))
508	if result.status != KunitStatus.SUCCESS:
509		sys.exit(1)
510
511
512def exec_handler(cli_args: argparse.Namespace) -> None:
513	linux = tree_from_args(cli_args)
514	exec_request = KunitExecRequest(raw_output=cli_args.raw_output,
515					build_dir=cli_args.build_dir,
516					json=cli_args.json,
517					summary=cli_args.summary,
518					failed=cli_args.failed,
519					timeout=cli_args.timeout,
520					filter_glob=cli_args.filter_glob,
521					filter=cli_args.filter,
522					filter_action=cli_args.filter_action,
523					kernel_args=cli_args.kernel_args,
524					run_isolated=cli_args.run_isolated,
525					list_tests=cli_args.list_tests,
526					list_tests_attr=cli_args.list_tests_attr)
527	result = exec_tests(linux, exec_request)
528	stdout.print_with_timestamp((
529		'Elapsed time: %.3fs\n') % (result.elapsed_time))
530	if result.status != KunitStatus.SUCCESS:
531		sys.exit(1)
532
533
534def parse_handler(cli_args: argparse.Namespace) -> None:
535	if cli_args.file is None:
536		sys.stdin.reconfigure(errors='backslashreplace')  # type: ignore
537		kunit_output = sys.stdin  # type: Iterable[str]
538	else:
539		with open(cli_args.file, 'r', errors='backslashreplace') as f:
540			kunit_output = f.read().splitlines()
541	# We know nothing about how the result was created!
542	metadata = kunit_json.Metadata()
543	request = KunitParseRequest(raw_output=cli_args.raw_output,
544					json=cli_args.json, summary=cli_args.summary,
545					failed=cli_args.failed)
546	result, _ = parse_tests(request, metadata, kunit_output)
547	if result.status != KunitStatus.SUCCESS:
548		sys.exit(1)
549
550
551subcommand_handlers_map = {
552	'run': run_handler,
553	'config': config_handler,
554	'build': build_handler,
555	'exec': exec_handler,
556	'parse': parse_handler
557}
558
559
560def main(argv: Sequence[str]) -> None:
561	parser = argparse.ArgumentParser(
562			description='Helps writing and running KUnit tests.')
563	subparser = parser.add_subparsers(dest='subcommand')
564
565	# The 'run' command will config, build, exec, and parse in one go.
566	run_parser = subparser.add_parser('run', help='Runs KUnit tests.')
567	add_common_opts(run_parser)
568	add_build_opts(run_parser)
569	add_exec_opts(run_parser)
570	add_parse_opts(run_parser)
571
572	config_parser = subparser.add_parser('config',
573						help='Ensures that .config contains all of '
574						'the options in .kunitconfig')
575	add_common_opts(config_parser)
576
577	build_parser = subparser.add_parser('build', help='Builds a kernel with KUnit tests')
578	add_common_opts(build_parser)
579	add_build_opts(build_parser)
580
581	exec_parser = subparser.add_parser('exec', help='Run a kernel with KUnit tests')
582	add_common_opts(exec_parser)
583	add_exec_opts(exec_parser)
584	add_parse_opts(exec_parser)
585
586	# The 'parse' option is special, as it doesn't need the kernel source
587	# (therefore there is no need for a build_dir, hence no add_common_opts)
588	# and the '--file' argument is not relevant to 'run', so isn't in
589	# add_parse_opts()
590	parse_parser = subparser.add_parser('parse',
591					    help='Parses KUnit results from a file, '
592					    'and parses formatted results.')
593	add_parse_opts(parse_parser)
594	parse_parser.add_argument('file',
595				  help='Specifies the file to read results from.',
596				  type=str, nargs='?', metavar='input_file')
597
598	cli_args = parser.parse_args(massage_argv(argv))
599
600	if get_kernel_root_path():
601		os.chdir(get_kernel_root_path())
602
603	subcomand_handler = subcommand_handlers_map.get(cli_args.subcommand, None)
604
605	if subcomand_handler is None:
606		parser.print_help()
607		return
608
609	subcomand_handler(cli_args)
610
611
612if __name__ == '__main__':
613	main(sys.argv[1:])
614