xref: /linux/tools/testing/kunit/kunit.py (revision 876f7a438e4247a948268ad77b67c494f709cc30)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3#
4# A thin wrapper on top of the KUnit Kernel
5#
6# Copyright (C) 2019, Google LLC.
7# Author: Felix Guo <felixguoxiuping@gmail.com>
8# Author: Brendan Higgins <brendanhiggins@google.com>
9
10import argparse
11import os
12import re
13import sys
14import time
15
16assert sys.version_info >= (3, 7), "Python version is too old"
17
18from dataclasses import dataclass
19from enum import Enum, auto
20from typing import Any, Iterable, Sequence, List, Optional
21
22import kunit_json
23import kunit_kernel
24import kunit_parser
25
26class KunitStatus(Enum):
27	SUCCESS = auto()
28	CONFIG_FAILURE = auto()
29	BUILD_FAILURE = auto()
30	TEST_FAILURE = auto()
31
32@dataclass
33class KunitResult:
34	status: KunitStatus
35	result: Any
36	elapsed_time: float
37
38@dataclass
39class KunitConfigRequest:
40	build_dir: str
41	make_options: Optional[List[str]]
42
43@dataclass
44class KunitBuildRequest(KunitConfigRequest):
45	jobs: int
46	alltests: bool
47
48@dataclass
49class KunitParseRequest:
50	raw_output: Optional[str]
51	build_dir: str
52	json: Optional[str]
53
54@dataclass
55class KunitExecRequest(KunitParseRequest):
56	timeout: int
57	alltests: bool
58	filter_glob: str
59	kernel_args: Optional[List[str]]
60	run_isolated: Optional[str]
61
62@dataclass
63class KunitRequest(KunitExecRequest, KunitBuildRequest):
64	pass
65
66
67KernelDirectoryPath = sys.argv[0].split('tools/testing/kunit/')[0]
68
69def get_kernel_root_path() -> str:
70	path = sys.argv[0] if not __file__ else __file__
71	parts = os.path.realpath(path).split('tools/testing/kunit')
72	if len(parts) != 2:
73		sys.exit(1)
74	return parts[0]
75
76def config_tests(linux: kunit_kernel.LinuxSourceTree,
77		 request: KunitConfigRequest) -> KunitResult:
78	kunit_parser.print_with_timestamp('Configuring KUnit Kernel ...')
79
80	config_start = time.time()
81	success = linux.build_reconfig(request.build_dir, request.make_options)
82	config_end = time.time()
83	if not success:
84		return KunitResult(KunitStatus.CONFIG_FAILURE,
85				   'could not configure kernel',
86				   config_end - config_start)
87	return KunitResult(KunitStatus.SUCCESS,
88			   'configured kernel successfully',
89			   config_end - config_start)
90
91def build_tests(linux: kunit_kernel.LinuxSourceTree,
92		request: KunitBuildRequest) -> KunitResult:
93	kunit_parser.print_with_timestamp('Building KUnit Kernel ...')
94
95	build_start = time.time()
96	success = linux.build_kernel(request.alltests,
97				     request.jobs,
98				     request.build_dir,
99				     request.make_options)
100	build_end = time.time()
101	if not success:
102		return KunitResult(KunitStatus.BUILD_FAILURE,
103				   'could not build kernel',
104				   build_end - build_start)
105	if not success:
106		return KunitResult(KunitStatus.BUILD_FAILURE,
107				   'could not build kernel',
108				   build_end - build_start)
109	return KunitResult(KunitStatus.SUCCESS,
110			   'built kernel successfully',
111			   build_end - build_start)
112
113def config_and_build_tests(linux: kunit_kernel.LinuxSourceTree,
114			   request: KunitBuildRequest) -> KunitResult:
115	config_result = config_tests(linux, request)
116	if config_result.status != KunitStatus.SUCCESS:
117		return config_result
118
119	return build_tests(linux, request)
120
121def _list_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> List[str]:
122	args = ['kunit.action=list']
123	if request.kernel_args:
124		args.extend(request.kernel_args)
125
126	output = linux.run_kernel(args=args,
127			   timeout=None if request.alltests else request.timeout,
128			   filter_glob=request.filter_glob,
129			   build_dir=request.build_dir)
130	lines = kunit_parser.extract_tap_lines(output)
131	# Hack! Drop the dummy TAP version header that the executor prints out.
132	lines.pop()
133
134	# Filter out any extraneous non-test output that might have gotten mixed in.
135	return [l for l in lines if re.match('^[^\s.]+\.[^\s.]+$', l)]
136
137def _suites_from_test_list(tests: List[str]) -> List[str]:
138	"""Extracts all the suites from an ordered list of tests."""
139	suites = []  # type: List[str]
140	for t in tests:
141		parts = t.split('.', maxsplit=2)
142		if len(parts) != 2:
143			raise ValueError(f'internal KUnit error, test name should be of the form "<suite>.<test>", got "{t}"')
144		suite, case = parts
145		if not suites or suites[-1] != suite:
146			suites.append(suite)
147	return suites
148
149
150
151def exec_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> KunitResult:
152	filter_globs = [request.filter_glob]
153	if request.run_isolated:
154		tests = _list_tests(linux, request)
155		if request.run_isolated == 'test':
156			filter_globs = tests
157		if request.run_isolated == 'suite':
158			filter_globs = _suites_from_test_list(tests)
159			# Apply the test-part of the user's glob, if present.
160			if '.' in request.filter_glob:
161				test_glob = request.filter_glob.split('.', maxsplit=2)[1]
162				filter_globs = [g + '.'+ test_glob for g in filter_globs]
163
164	test_counts = kunit_parser.TestCounts()
165	exec_time = 0.0
166	for i, filter_glob in enumerate(filter_globs):
167		kunit_parser.print_with_timestamp('Starting KUnit Kernel ({}/{})...'.format(i+1, len(filter_globs)))
168
169		test_start = time.time()
170		run_result = linux.run_kernel(
171			args=request.kernel_args,
172			timeout=None if request.alltests else request.timeout,
173			filter_glob=filter_glob,
174			build_dir=request.build_dir)
175
176		result = parse_tests(request, run_result)
177		# run_kernel() doesn't block on the kernel exiting.
178		# That only happens after we get the last line of output from `run_result`.
179		# So exec_time here actually contains parsing + execution time, which is fine.
180		test_end = time.time()
181		exec_time += test_end - test_start
182
183		test_counts.add_subtest_counts(result.result.counts)
184
185	if len(filter_globs) == 1 and test_counts.crashed > 0:
186		bd = request.build_dir
187		print('The kernel seems to have crashed; you can decode the stack traces with:')
188		print('$ scripts/decode_stacktrace.sh {}/vmlinux {} < {} | tee {}/decoded.log | {} parse'.format(
189				bd, bd, kunit_kernel.get_outfile_path(bd), bd, sys.argv[0]))
190
191	kunit_status = _map_to_overall_status(test_counts.get_status())
192	return KunitResult(status=kunit_status, result=result, elapsed_time=exec_time)
193
194def _map_to_overall_status(test_status: kunit_parser.TestStatus) -> KunitStatus:
195	if test_status in (kunit_parser.TestStatus.SUCCESS, kunit_parser.TestStatus.SKIPPED):
196		return KunitStatus.SUCCESS
197	else:
198		return KunitStatus.TEST_FAILURE
199
200def parse_tests(request: KunitParseRequest, input_data: Iterable[str]) -> KunitResult:
201	parse_start = time.time()
202
203	test_result = kunit_parser.Test()
204
205	if request.raw_output:
206		# Treat unparsed results as one passing test.
207		test_result.status = kunit_parser.TestStatus.SUCCESS
208		test_result.counts.passed = 1
209
210		output: Iterable[str] = input_data
211		if request.raw_output == 'all':
212			pass
213		elif request.raw_output == 'kunit':
214			output = kunit_parser.extract_tap_lines(output)
215		else:
216			print(f'Unknown --raw_output option "{request.raw_output}"', file=sys.stderr)
217		for line in output:
218			print(line.rstrip())
219
220	else:
221		test_result = kunit_parser.parse_run_tests(input_data)
222	parse_end = time.time()
223
224	if request.json:
225		json_obj = kunit_json.get_json_result(
226					test=test_result,
227					def_config='kunit_defconfig',
228					build_dir=request.build_dir,
229					json_path=request.json)
230		if request.json == 'stdout':
231			print(json_obj)
232
233	if test_result.status != kunit_parser.TestStatus.SUCCESS:
234		return KunitResult(KunitStatus.TEST_FAILURE, test_result,
235				   parse_end - parse_start)
236
237	return KunitResult(KunitStatus.SUCCESS, test_result,
238				parse_end - parse_start)
239
240def run_tests(linux: kunit_kernel.LinuxSourceTree,
241	      request: KunitRequest) -> KunitResult:
242	run_start = time.time()
243
244	config_result = config_tests(linux, request)
245	if config_result.status != KunitStatus.SUCCESS:
246		return config_result
247
248	build_result = build_tests(linux, request)
249	if build_result.status != KunitStatus.SUCCESS:
250		return build_result
251
252	exec_result = exec_tests(linux, request)
253
254	run_end = time.time()
255
256	kunit_parser.print_with_timestamp((
257		'Elapsed time: %.3fs total, %.3fs configuring, %.3fs ' +
258		'building, %.3fs running\n') % (
259				run_end - run_start,
260				config_result.elapsed_time,
261				build_result.elapsed_time,
262				exec_result.elapsed_time))
263	return exec_result
264
265# Problem:
266# $ kunit.py run --json
267# works as one would expect and prints the parsed test results as JSON.
268# $ kunit.py run --json suite_name
269# would *not* pass suite_name as the filter_glob and print as json.
270# argparse will consider it to be another way of writing
271# $ kunit.py run --json=suite_name
272# i.e. it would run all tests, and dump the json to a `suite_name` file.
273# So we hackily automatically rewrite --json => --json=stdout
274pseudo_bool_flag_defaults = {
275		'--json': 'stdout',
276		'--raw_output': 'kunit',
277}
278def massage_argv(argv: Sequence[str]) -> Sequence[str]:
279	def massage_arg(arg: str) -> str:
280		if arg not in pseudo_bool_flag_defaults:
281			return arg
282		return  f'{arg}={pseudo_bool_flag_defaults[arg]}'
283	return list(map(massage_arg, argv))
284
285def get_default_jobs() -> int:
286	return len(os.sched_getaffinity(0))
287
288def add_common_opts(parser) -> None:
289	parser.add_argument('--build_dir',
290			    help='As in the make command, it specifies the build '
291			    'directory.',
292			    type=str, default='.kunit', metavar='build_dir')
293	parser.add_argument('--make_options',
294			    help='X=Y make option, can be repeated.',
295			    action='append')
296	parser.add_argument('--alltests',
297			    help='Run all KUnit tests through allyesconfig',
298			    action='store_true')
299	parser.add_argument('--kunitconfig',
300			     help='Path to Kconfig fragment that enables KUnit tests.'
301			     ' If given a directory, (e.g. lib/kunit), "/.kunitconfig" '
302			     'will get  automatically appended.',
303			     metavar='kunitconfig')
304	parser.add_argument('--kconfig_add',
305			     help='Additional Kconfig options to append to the '
306			     '.kunitconfig, e.g. CONFIG_KASAN=y. Can be repeated.',
307			    action='append')
308
309	parser.add_argument('--arch',
310			    help=('Specifies the architecture to run tests under. '
311				  'The architecture specified here must match the '
312				  'string passed to the ARCH make param, '
313				  'e.g. i386, x86_64, arm, um, etc. Non-UML '
314				  'architectures run on QEMU.'),
315			    type=str, default='um', metavar='arch')
316
317	parser.add_argument('--cross_compile',
318			    help=('Sets make\'s CROSS_COMPILE variable; it should '
319				  'be set to a toolchain path prefix (the prefix '
320				  'of gcc and other tools in your toolchain, for '
321				  'example `sparc64-linux-gnu-` if you have the '
322				  'sparc toolchain installed on your system, or '
323				  '`$HOME/toolchains/microblaze/gcc-9.2.0-nolibc/microblaze-linux/bin/microblaze-linux-` '
324				  'if you have downloaded the microblaze toolchain '
325				  'from the 0-day website to a directory in your '
326				  'home directory called `toolchains`).'),
327			    metavar='cross_compile')
328
329	parser.add_argument('--qemu_config',
330			    help=('Takes a path to a path to a file containing '
331				  'a QemuArchParams object.'),
332			    type=str, metavar='qemu_config')
333
334def add_build_opts(parser) -> None:
335	parser.add_argument('--jobs',
336			    help='As in the make command, "Specifies  the number of '
337			    'jobs (commands) to run simultaneously."',
338			    type=int, default=get_default_jobs(), metavar='jobs')
339
340def add_exec_opts(parser) -> None:
341	parser.add_argument('--timeout',
342			    help='maximum number of seconds to allow for all tests '
343			    'to run. This does not include time taken to build the '
344			    'tests.',
345			    type=int,
346			    default=300,
347			    metavar='timeout')
348	parser.add_argument('filter_glob',
349			    help='Filter which KUnit test suites/tests run at '
350			    'boot-time, e.g. list* or list*.*del_test',
351			    type=str,
352			    nargs='?',
353			    default='',
354			    metavar='filter_glob')
355	parser.add_argument('--kernel_args',
356			    help='Kernel command-line parameters. Maybe be repeated',
357			     action='append')
358	parser.add_argument('--run_isolated', help='If set, boot the kernel for each '
359			    'individual suite/test. This is can be useful for debugging '
360			    'a non-hermetic test, one that might pass/fail based on '
361			    'what ran before it.',
362			    type=str,
363			    choices=['suite', 'test']),
364
365def add_parse_opts(parser) -> None:
366	parser.add_argument('--raw_output', help='If set don\'t format output from kernel. '
367			    'If set to --raw_output=kunit, filters to just KUnit output.',
368			    type=str, nargs='?', const='all', default=None)
369	parser.add_argument('--json',
370			    nargs='?',
371			    help='Stores test results in a JSON, and either '
372			    'prints to stdout or saves to file if a '
373			    'filename is specified',
374			    type=str, const='stdout', default=None)
375
376def main(argv, linux=None):
377	parser = argparse.ArgumentParser(
378			description='Helps writing and running KUnit tests.')
379	subparser = parser.add_subparsers(dest='subcommand')
380
381	# The 'run' command will config, build, exec, and parse in one go.
382	run_parser = subparser.add_parser('run', help='Runs KUnit tests.')
383	add_common_opts(run_parser)
384	add_build_opts(run_parser)
385	add_exec_opts(run_parser)
386	add_parse_opts(run_parser)
387
388	config_parser = subparser.add_parser('config',
389						help='Ensures that .config contains all of '
390						'the options in .kunitconfig')
391	add_common_opts(config_parser)
392
393	build_parser = subparser.add_parser('build', help='Builds a kernel with KUnit tests')
394	add_common_opts(build_parser)
395	add_build_opts(build_parser)
396
397	exec_parser = subparser.add_parser('exec', help='Run a kernel with KUnit tests')
398	add_common_opts(exec_parser)
399	add_exec_opts(exec_parser)
400	add_parse_opts(exec_parser)
401
402	# The 'parse' option is special, as it doesn't need the kernel source
403	# (therefore there is no need for a build_dir, hence no add_common_opts)
404	# and the '--file' argument is not relevant to 'run', so isn't in
405	# add_parse_opts()
406	parse_parser = subparser.add_parser('parse',
407					    help='Parses KUnit results from a file, '
408					    'and parses formatted results.')
409	add_parse_opts(parse_parser)
410	parse_parser.add_argument('file',
411				  help='Specifies the file to read results from.',
412				  type=str, nargs='?', metavar='input_file')
413
414	cli_args = parser.parse_args(massage_argv(argv))
415
416	if get_kernel_root_path():
417		os.chdir(get_kernel_root_path())
418
419	if cli_args.subcommand == 'run':
420		if not os.path.exists(cli_args.build_dir):
421			os.mkdir(cli_args.build_dir)
422
423		if not linux:
424			linux = kunit_kernel.LinuxSourceTree(cli_args.build_dir,
425					kunitconfig_path=cli_args.kunitconfig,
426					kconfig_add=cli_args.kconfig_add,
427					arch=cli_args.arch,
428					cross_compile=cli_args.cross_compile,
429					qemu_config_path=cli_args.qemu_config)
430
431		request = KunitRequest(build_dir=cli_args.build_dir,
432				       make_options=cli_args.make_options,
433				       jobs=cli_args.jobs,
434				       alltests=cli_args.alltests,
435				       raw_output=cli_args.raw_output,
436				       json=cli_args.json,
437				       timeout=cli_args.timeout,
438				       filter_glob=cli_args.filter_glob,
439				       kernel_args=cli_args.kernel_args,
440				       run_isolated=cli_args.run_isolated)
441		result = run_tests(linux, request)
442		if result.status != KunitStatus.SUCCESS:
443			sys.exit(1)
444	elif cli_args.subcommand == 'config':
445		if cli_args.build_dir and (
446				not os.path.exists(cli_args.build_dir)):
447			os.mkdir(cli_args.build_dir)
448
449		if not linux:
450			linux = kunit_kernel.LinuxSourceTree(cli_args.build_dir,
451					kunitconfig_path=cli_args.kunitconfig,
452					kconfig_add=cli_args.kconfig_add,
453					arch=cli_args.arch,
454					cross_compile=cli_args.cross_compile,
455					qemu_config_path=cli_args.qemu_config)
456
457		request = KunitConfigRequest(build_dir=cli_args.build_dir,
458					     make_options=cli_args.make_options)
459		result = config_tests(linux, request)
460		kunit_parser.print_with_timestamp((
461			'Elapsed time: %.3fs\n') % (
462				result.elapsed_time))
463		if result.status != KunitStatus.SUCCESS:
464			sys.exit(1)
465	elif cli_args.subcommand == 'build':
466		if not linux:
467			linux = kunit_kernel.LinuxSourceTree(cli_args.build_dir,
468					kunitconfig_path=cli_args.kunitconfig,
469					kconfig_add=cli_args.kconfig_add,
470					arch=cli_args.arch,
471					cross_compile=cli_args.cross_compile,
472					qemu_config_path=cli_args.qemu_config)
473
474		request = KunitBuildRequest(build_dir=cli_args.build_dir,
475					    make_options=cli_args.make_options,
476					    jobs=cli_args.jobs,
477					    alltests=cli_args.alltests)
478		result = config_and_build_tests(linux, request)
479		kunit_parser.print_with_timestamp((
480			'Elapsed time: %.3fs\n') % (
481				result.elapsed_time))
482		if result.status != KunitStatus.SUCCESS:
483			sys.exit(1)
484	elif cli_args.subcommand == 'exec':
485		if not linux:
486			linux = kunit_kernel.LinuxSourceTree(cli_args.build_dir,
487					kunitconfig_path=cli_args.kunitconfig,
488					kconfig_add=cli_args.kconfig_add,
489					arch=cli_args.arch,
490					cross_compile=cli_args.cross_compile,
491					qemu_config_path=cli_args.qemu_config)
492
493		exec_request = KunitExecRequest(raw_output=cli_args.raw_output,
494						build_dir=cli_args.build_dir,
495						json=cli_args.json,
496						timeout=cli_args.timeout,
497						alltests=cli_args.alltests,
498						filter_glob=cli_args.filter_glob,
499						kernel_args=cli_args.kernel_args,
500						run_isolated=cli_args.run_isolated)
501		result = exec_tests(linux, exec_request)
502		kunit_parser.print_with_timestamp((
503			'Elapsed time: %.3fs\n') % (result.elapsed_time))
504		if result.status != KunitStatus.SUCCESS:
505			sys.exit(1)
506	elif cli_args.subcommand == 'parse':
507		if cli_args.file == None:
508			sys.stdin.reconfigure(errors='backslashreplace')  # pytype: disable=attribute-error
509			kunit_output = sys.stdin
510		else:
511			with open(cli_args.file, 'r', errors='backslashreplace') as f:
512				kunit_output = f.read().splitlines()
513		request = KunitParseRequest(raw_output=cli_args.raw_output,
514					    build_dir='',
515					    json=cli_args.json)
516		result = parse_tests(request, kunit_output)
517		if result.status != KunitStatus.SUCCESS:
518			sys.exit(1)
519	else:
520		parser.print_help()
521
522if __name__ == '__main__':
523	main(sys.argv[1:])
524