xref: /linux/tools/testing/kunit/kunit_kernel.py (revision 74f1af95820fc2ee580a775a3a17c416db30b38c)
1# SPDX-License-Identifier: GPL-2.0
2#
3# Runs UML kernel, collects output, and handles errors.
4#
5# Copyright (C) 2019, Google LLC.
6# Author: Felix Guo <felixguoxiuping@gmail.com>
7# Author: Brendan Higgins <brendanhiggins@google.com>
8
9import importlib.abc
10import importlib.util
11import logging
12import subprocess
13import os
14import shlex
15import shutil
16import signal
17import sys
18import threading
19from typing import Iterator, List, Optional, Tuple
20from types import FrameType
21
22import kunit_config
23import qemu_config
24
25KCONFIG_PATH = '.config'
26KUNITCONFIG_PATH = '.kunitconfig'
27OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig'
28DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config'
29ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config'
30UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config'
31OUTFILE_PATH = 'test.log'
32ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__))
33QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs')
34
35class ConfigError(Exception):
36	"""Represents an error trying to configure the Linux kernel."""
37
38
39class BuildError(Exception):
40	"""Represents an error trying to build the Linux kernel."""
41
42
43class LinuxSourceTreeOperations:
44	"""An abstraction over command line operations performed on a source tree."""
45
46	def __init__(self, linux_arch: str, cross_compile: Optional[str]):
47		self._linux_arch = linux_arch
48		self._cross_compile = cross_compile
49
50	def make_mrproper(self) -> None:
51		try:
52			subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT)
53		except OSError as e:
54			raise ConfigError('Could not call make command: ' + str(e))
55		except subprocess.CalledProcessError as e:
56			raise ConfigError(e.output.decode())
57
58	def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
59		return base_kunitconfig
60
61	def make_olddefconfig(self, build_dir: str, make_options: Optional[List[str]]) -> None:
62		command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig']
63		if self._cross_compile:
64			command += ['CROSS_COMPILE=' + self._cross_compile]
65		if make_options:
66			command.extend(make_options)
67		print('Populating config with:\n$', ' '.join(command))
68		try:
69			subprocess.check_output(command, stderr=subprocess.STDOUT)
70		except OSError as e:
71			raise ConfigError('Could not call make command: ' + str(e))
72		except subprocess.CalledProcessError as e:
73			raise ConfigError(e.output.decode())
74
75	def make(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> None:
76		command = ['make', 'all', 'compile_commands.json', 'scripts_gdb',
77			   'ARCH=' + self._linux_arch, 'O=' + build_dir, '--jobs=' + str(jobs)]
78		if make_options:
79			command.extend(make_options)
80		if self._cross_compile:
81			command += ['CROSS_COMPILE=' + self._cross_compile]
82		print('Building with:\n$', ' '.join(command))
83		try:
84			proc = subprocess.Popen(command,
85						stderr=subprocess.PIPE,
86						stdout=subprocess.DEVNULL)
87		except OSError as e:
88			raise BuildError('Could not call execute make: ' + str(e))
89		except subprocess.CalledProcessError as e:
90			raise BuildError(e.output)
91		_, stderr = proc.communicate()
92		if proc.returncode != 0:
93			raise BuildError(stderr.decode())
94		if stderr:  # likely only due to build warnings
95			print(stderr.decode())
96
97	def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
98		raise RuntimeError('not implemented!')
99
100
101class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations):
102
103	def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]):
104		super().__init__(linux_arch=qemu_arch_params.linux_arch,
105				 cross_compile=cross_compile)
106		self._kconfig = qemu_arch_params.kconfig
107		self._qemu_arch = qemu_arch_params.qemu_arch
108		self._kernel_path = qemu_arch_params.kernel_path
109		self._kernel_command_line = qemu_arch_params.kernel_command_line
110		if 'kunit_shutdown=' not in self._kernel_command_line:
111			self._kernel_command_line += ' kunit_shutdown=reboot'
112		self._extra_qemu_params = qemu_arch_params.extra_qemu_params
113		self._serial = qemu_arch_params.serial
114
115	def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
116		kconfig = kunit_config.parse_from_string(self._kconfig)
117		kconfig.merge_in_entries(base_kunitconfig)
118		return kconfig
119
120	def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
121		kernel_path = os.path.join(build_dir, self._kernel_path)
122		qemu_command = ['qemu-system-' + self._qemu_arch,
123				'-nodefaults',
124				'-m', '1024',
125				'-kernel', kernel_path,
126				'-append', ' '.join(params + [self._kernel_command_line]),
127				'-no-reboot',
128				'-nographic',
129				'-accel', 'kvm',
130				'-accel', 'hvf',
131				'-accel', 'tcg',
132				'-serial', self._serial] + self._extra_qemu_params
133		# Note: shlex.join() does what we want, but requires python 3.8+.
134		print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command))
135		return subprocess.Popen(qemu_command,
136					stdin=subprocess.PIPE,
137					stdout=subprocess.PIPE,
138					stderr=subprocess.STDOUT,
139					text=True, errors='backslashreplace')
140
141class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations):
142	"""An abstraction over command line operations performed on a source tree."""
143
144	def __init__(self, cross_compile: Optional[str]=None):
145		super().__init__(linux_arch='um', cross_compile=cross_compile)
146
147	def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
148		kconfig = kunit_config.parse_file(UML_KCONFIG_PATH)
149		kconfig.merge_in_entries(base_kunitconfig)
150		return kconfig
151
152	def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
153		"""Runs the Linux UML binary. Must be named 'linux'."""
154		linux_bin = os.path.join(build_dir, 'linux')
155		params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt'])
156		print('Running tests with:\n$', linux_bin, ' '.join(shlex.quote(arg) for arg in params))
157		return subprocess.Popen([linux_bin] + params,
158					   stdin=subprocess.PIPE,
159					   stdout=subprocess.PIPE,
160					   stderr=subprocess.STDOUT,
161					   text=True, errors='backslashreplace')
162
163def get_kconfig_path(build_dir: str) -> str:
164	return os.path.join(build_dir, KCONFIG_PATH)
165
166def get_kunitconfig_path(build_dir: str) -> str:
167	return os.path.join(build_dir, KUNITCONFIG_PATH)
168
169def get_old_kunitconfig_path(build_dir: str) -> str:
170	return os.path.join(build_dir, OLD_KUNITCONFIG_PATH)
171
172def get_parsed_kunitconfig(build_dir: str,
173			   kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig:
174	if not kunitconfig_paths:
175		path = get_kunitconfig_path(build_dir)
176		if not os.path.exists(path):
177			shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path)
178		return kunit_config.parse_file(path)
179
180	merged = kunit_config.Kconfig()
181
182	for path in kunitconfig_paths:
183		if os.path.isdir(path):
184			path = os.path.join(path, KUNITCONFIG_PATH)
185		if not os.path.exists(path):
186			raise ConfigError(f'Specified kunitconfig ({path}) does not exist')
187
188		partial = kunit_config.parse_file(path)
189		diff = merged.conflicting_options(partial)
190		if diff:
191			diff_str = '\n\n'.join(f'{a}\n  vs from {path}\n{b}' for a, b in diff)
192			raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}')
193		merged.merge_in_entries(partial)
194	return merged
195
196def get_outfile_path(build_dir: str) -> str:
197	return os.path.join(build_dir, OUTFILE_PATH)
198
199def _default_qemu_config_path(arch: str) -> str:
200	config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py')
201	if os.path.isfile(config_path):
202		return config_path
203
204	options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')]
205
206	if arch == 'help':
207		print('um')
208		for option in options:
209			print(option)
210		sys.exit()
211
212	raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options)))
213
214def _get_qemu_ops(config_path: str,
215		  extra_qemu_args: Optional[List[str]],
216		  cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]:
217	# The module name/path has very little to do with where the actual file
218	# exists (I learned this through experimentation and could not find it
219	# anywhere in the Python documentation).
220	#
221	# Bascially, we completely ignore the actual file location of the config
222	# we are loading and just tell Python that the module lives in the
223	# QEMU_CONFIGS_DIR for import purposes regardless of where it actually
224	# exists as a file.
225	module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path))
226	spec = importlib.util.spec_from_file_location(module_path, config_path)
227	assert spec is not None
228	config = importlib.util.module_from_spec(spec)
229	# See https://github.com/python/typeshed/pull/2626 for context.
230	assert isinstance(spec.loader, importlib.abc.Loader)
231	spec.loader.exec_module(config)
232
233	if not hasattr(config, 'QEMU_ARCH'):
234		raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path)
235	params: qemu_config.QemuArchParams = config.QEMU_ARCH
236	if extra_qemu_args:
237		params.extra_qemu_params.extend(extra_qemu_args)
238	return params.linux_arch, LinuxSourceTreeOperationsQemu(
239			params, cross_compile=cross_compile)
240
241class LinuxSourceTree:
242	"""Represents a Linux kernel source tree with KUnit tests."""
243
244	def __init__(
245	      self,
246	      build_dir: str,
247	      kunitconfig_paths: Optional[List[str]]=None,
248	      kconfig_add: Optional[List[str]]=None,
249	      arch: Optional[str]=None,
250	      cross_compile: Optional[str]=None,
251	      qemu_config_path: Optional[str]=None,
252	      extra_qemu_args: Optional[List[str]]=None) -> None:
253		signal.signal(signal.SIGINT, self.signal_handler)
254		if qemu_config_path:
255			self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
256		else:
257			self._arch = 'um' if arch is None else arch
258			if self._arch == 'um':
259				self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile)
260			else:
261				qemu_config_path = _default_qemu_config_path(self._arch)
262				_, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
263
264		self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths)
265		if kconfig_add:
266			kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add))
267			self._kconfig.merge_in_entries(kconfig)
268
269	def arch(self) -> str:
270		return self._arch
271
272	def clean(self) -> bool:
273		try:
274			self._ops.make_mrproper()
275		except ConfigError as e:
276			logging.error(e)
277			return False
278		return True
279
280	def validate_config(self, build_dir: str) -> bool:
281		kconfig_path = get_kconfig_path(build_dir)
282		validated_kconfig = kunit_config.parse_file(kconfig_path)
283		if self._kconfig.is_subset_of(validated_kconfig):
284			return True
285		missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries())
286		message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \
287			  'This is probably due to unsatisfied dependencies.\n' \
288			  'Missing: ' + ', '.join(str(e) for e in missing)
289		if self._arch == 'um':
290			message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \
291				   'on a different architecture with something like "--arch=x86_64".'
292		logging.error(message)
293		return False
294
295	def build_config(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
296		kconfig_path = get_kconfig_path(build_dir)
297		if build_dir and not os.path.exists(build_dir):
298			os.mkdir(build_dir)
299		try:
300			self._kconfig = self._ops.make_arch_config(self._kconfig)
301			self._kconfig.write_to_file(kconfig_path)
302			self._ops.make_olddefconfig(build_dir, make_options)
303		except ConfigError as e:
304			logging.error(e)
305			return False
306		if not self.validate_config(build_dir):
307			return False
308
309		old_path = get_old_kunitconfig_path(build_dir)
310		if os.path.exists(old_path):
311			os.remove(old_path)  # write_to_file appends to the file
312		self._kconfig.write_to_file(old_path)
313		return True
314
315	def _kunitconfig_changed(self, build_dir: str) -> bool:
316		old_path = get_old_kunitconfig_path(build_dir)
317		if not os.path.exists(old_path):
318			return True
319
320		old_kconfig = kunit_config.parse_file(old_path)
321		return old_kconfig != self._kconfig
322
323	def build_reconfig(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
324		"""Creates a new .config if it is not a subset of the .kunitconfig."""
325		kconfig_path = get_kconfig_path(build_dir)
326		if not os.path.exists(kconfig_path):
327			print('Generating .config ...')
328			return self.build_config(build_dir, make_options)
329
330		existing_kconfig = kunit_config.parse_file(kconfig_path)
331		self._kconfig = self._ops.make_arch_config(self._kconfig)
332
333		if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir):
334			return True
335		print('Regenerating .config ...')
336		os.remove(kconfig_path)
337		return self.build_config(build_dir, make_options)
338
339	def build_kernel(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> bool:
340		try:
341			self._ops.make_olddefconfig(build_dir, make_options)
342			self._ops.make(jobs, build_dir, make_options)
343		except (ConfigError, BuildError) as e:
344			logging.error(e)
345			return False
346		return self.validate_config(build_dir)
347
348	def run_kernel(self, args: Optional[List[str]]=None, build_dir: str='', filter_glob: str='', filter: str='', filter_action: Optional[str]=None, timeout: Optional[int]=None) -> Iterator[str]:
349		if not args:
350			args = []
351		if filter_glob:
352			args.append('kunit.filter_glob=' + filter_glob)
353		if filter:
354			args.append('kunit.filter="' + filter + '"')
355		if filter_action:
356			args.append('kunit.filter_action=' + filter_action)
357		args.append('kunit.enable=1')
358
359		process = self._ops.start(args, build_dir)
360		assert process.stdout is not None  # tell mypy it's set
361
362		# Enforce the timeout in a background thread.
363		def _wait_proc() -> None:
364			try:
365				process.wait(timeout=timeout)
366			except Exception as e:
367				print(e)
368				process.terminate()
369				process.wait()
370		waiter = threading.Thread(target=_wait_proc)
371		waiter.start()
372
373		output = open(get_outfile_path(build_dir), 'w')
374		try:
375			# Tee the output to the file and to our caller in real time.
376			for line in process.stdout:
377				output.write(line)
378				yield line
379		# This runs even if our caller doesn't consume every line.
380		finally:
381			# Flush any leftover output to the file
382			output.write(process.stdout.read())
383			output.close()
384			process.stdout.close()
385
386			waiter.join()
387			subprocess.call(['stty', 'sane'])
388
389	def signal_handler(self, unused_sig: int, unused_frame: Optional[FrameType]) -> None:
390		logging.error('Build interruption occurred. Cleaning console.')
391		subprocess.call(['stty', 'sane'])
392