xref: /linux/tools/testing/kunit/kunit_kernel.py (revision a10c7949adf94356e56d5c8878f6fc3f25bd0c15)
1# SPDX-License-Identifier: GPL-2.0
2#
3# Runs UML kernel, collects output, and handles errors.
4#
5# Copyright (C) 2019, Google LLC.
6# Author: Felix Guo <felixguoxiuping@gmail.com>
7# Author: Brendan Higgins <brendanhiggins@google.com>
8
9import importlib.abc
10import importlib.util
11import logging
12import subprocess
13import os
14import shlex
15import shutil
16import signal
17import threading
18from typing import Iterator, List, Optional, Tuple
19from types import FrameType
20
21import kunit_config
22import qemu_config
23
24KCONFIG_PATH = '.config'
25KUNITCONFIG_PATH = '.kunitconfig'
26OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig'
27DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config'
28ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config'
29UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config'
30OUTFILE_PATH = 'test.log'
31ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__))
32QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs')
33
34class ConfigError(Exception):
35	"""Represents an error trying to configure the Linux kernel."""
36
37
38class BuildError(Exception):
39	"""Represents an error trying to build the Linux kernel."""
40
41
42class LinuxSourceTreeOperations:
43	"""An abstraction over command line operations performed on a source tree."""
44
45	def __init__(self, linux_arch: str, cross_compile: Optional[str]):
46		self._linux_arch = linux_arch
47		self._cross_compile = cross_compile
48
49	def make_mrproper(self) -> None:
50		try:
51			subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT)
52		except OSError as e:
53			raise ConfigError('Could not call make command: ' + str(e))
54		except subprocess.CalledProcessError as e:
55			raise ConfigError(e.output.decode())
56
57	def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
58		return base_kunitconfig
59
60	def make_olddefconfig(self, build_dir: str, make_options: Optional[List[str]]) -> None:
61		command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig']
62		if self._cross_compile:
63			command += ['CROSS_COMPILE=' + self._cross_compile]
64		if make_options:
65			command.extend(make_options)
66		print('Populating config with:\n$', ' '.join(command))
67		try:
68			subprocess.check_output(command, stderr=subprocess.STDOUT)
69		except OSError as e:
70			raise ConfigError('Could not call make command: ' + str(e))
71		except subprocess.CalledProcessError as e:
72			raise ConfigError(e.output.decode())
73
74	def make(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> None:
75		command = ['make', 'all', 'compile_commands.json', 'scripts_gdb',
76			   'ARCH=' + self._linux_arch, 'O=' + build_dir, '--jobs=' + str(jobs)]
77		if make_options:
78			command.extend(make_options)
79		if self._cross_compile:
80			command += ['CROSS_COMPILE=' + self._cross_compile]
81		print('Building with:\n$', ' '.join(command))
82		try:
83			proc = subprocess.Popen(command,
84						stderr=subprocess.PIPE,
85						stdout=subprocess.DEVNULL)
86		except OSError as e:
87			raise BuildError('Could not call execute make: ' + str(e))
88		except subprocess.CalledProcessError as e:
89			raise BuildError(e.output)
90		_, stderr = proc.communicate()
91		if proc.returncode != 0:
92			raise BuildError(stderr.decode())
93		if stderr:  # likely only due to build warnings
94			print(stderr.decode())
95
96	def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
97		raise RuntimeError('not implemented!')
98
99
100class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations):
101
102	def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]):
103		super().__init__(linux_arch=qemu_arch_params.linux_arch,
104				 cross_compile=cross_compile)
105		self._kconfig = qemu_arch_params.kconfig
106		self._qemu_arch = qemu_arch_params.qemu_arch
107		self._kernel_path = qemu_arch_params.kernel_path
108		self._kernel_command_line = qemu_arch_params.kernel_command_line
109		if 'kunit_shutdown=' not in self._kernel_command_line:
110			self._kernel_command_line += ' kunit_shutdown=reboot'
111		self._extra_qemu_params = qemu_arch_params.extra_qemu_params
112		self._serial = qemu_arch_params.serial
113
114	def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
115		kconfig = kunit_config.parse_from_string(self._kconfig)
116		kconfig.merge_in_entries(base_kunitconfig)
117		return kconfig
118
119	def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
120		kernel_path = os.path.join(build_dir, self._kernel_path)
121		qemu_command = ['qemu-system-' + self._qemu_arch,
122				'-nodefaults',
123				'-m', '1024',
124				'-kernel', kernel_path,
125				'-append', ' '.join(params + [self._kernel_command_line]),
126				'-no-reboot',
127				'-nographic',
128				'-accel', 'kvm',
129				'-accel', 'hvf',
130				'-accel', 'tcg',
131				'-serial', self._serial] + self._extra_qemu_params
132		# Note: shlex.join() does what we want, but requires python 3.8+.
133		print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command))
134		return subprocess.Popen(qemu_command,
135					stdin=subprocess.PIPE,
136					stdout=subprocess.PIPE,
137					stderr=subprocess.STDOUT,
138					text=True, errors='backslashreplace')
139
140class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations):
141	"""An abstraction over command line operations performed on a source tree."""
142
143	def __init__(self, cross_compile: Optional[str]=None):
144		super().__init__(linux_arch='um', cross_compile=cross_compile)
145
146	def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
147		kconfig = kunit_config.parse_file(UML_KCONFIG_PATH)
148		kconfig.merge_in_entries(base_kunitconfig)
149		return kconfig
150
151	def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
152		"""Runs the Linux UML binary. Must be named 'linux'."""
153		linux_bin = os.path.join(build_dir, 'linux')
154		params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt'])
155		print('Running tests with:\n$', linux_bin, ' '.join(shlex.quote(arg) for arg in params))
156		return subprocess.Popen([linux_bin] + params,
157					   stdin=subprocess.PIPE,
158					   stdout=subprocess.PIPE,
159					   stderr=subprocess.STDOUT,
160					   text=True, errors='backslashreplace')
161
162def get_kconfig_path(build_dir: str) -> str:
163	return os.path.join(build_dir, KCONFIG_PATH)
164
165def get_kunitconfig_path(build_dir: str) -> str:
166	return os.path.join(build_dir, KUNITCONFIG_PATH)
167
168def get_old_kunitconfig_path(build_dir: str) -> str:
169	return os.path.join(build_dir, OLD_KUNITCONFIG_PATH)
170
171def get_parsed_kunitconfig(build_dir: str,
172			   kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig:
173	if not kunitconfig_paths:
174		path = get_kunitconfig_path(build_dir)
175		if not os.path.exists(path):
176			shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path)
177		return kunit_config.parse_file(path)
178
179	merged = kunit_config.Kconfig()
180
181	for path in kunitconfig_paths:
182		if os.path.isdir(path):
183			path = os.path.join(path, KUNITCONFIG_PATH)
184		if not os.path.exists(path):
185			raise ConfigError(f'Specified kunitconfig ({path}) does not exist')
186
187		partial = kunit_config.parse_file(path)
188		diff = merged.conflicting_options(partial)
189		if diff:
190			diff_str = '\n\n'.join(f'{a}\n  vs from {path}\n{b}' for a, b in diff)
191			raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}')
192		merged.merge_in_entries(partial)
193	return merged
194
195def get_outfile_path(build_dir: str) -> str:
196	return os.path.join(build_dir, OUTFILE_PATH)
197
198def _default_qemu_config_path(arch: str) -> str:
199	config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py')
200	if os.path.isfile(config_path):
201		return config_path
202
203	options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')]
204	raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options)))
205
206def _get_qemu_ops(config_path: str,
207		  extra_qemu_args: Optional[List[str]],
208		  cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]:
209	# The module name/path has very little to do with where the actual file
210	# exists (I learned this through experimentation and could not find it
211	# anywhere in the Python documentation).
212	#
213	# Bascially, we completely ignore the actual file location of the config
214	# we are loading and just tell Python that the module lives in the
215	# QEMU_CONFIGS_DIR for import purposes regardless of where it actually
216	# exists as a file.
217	module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path))
218	spec = importlib.util.spec_from_file_location(module_path, config_path)
219	assert spec is not None
220	config = importlib.util.module_from_spec(spec)
221	# See https://github.com/python/typeshed/pull/2626 for context.
222	assert isinstance(spec.loader, importlib.abc.Loader)
223	spec.loader.exec_module(config)
224
225	if not hasattr(config, 'QEMU_ARCH'):
226		raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path)
227	params: qemu_config.QemuArchParams = config.QEMU_ARCH
228	if extra_qemu_args:
229		params.extra_qemu_params.extend(extra_qemu_args)
230	return params.linux_arch, LinuxSourceTreeOperationsQemu(
231			params, cross_compile=cross_compile)
232
233class LinuxSourceTree:
234	"""Represents a Linux kernel source tree with KUnit tests."""
235
236	def __init__(
237	      self,
238	      build_dir: str,
239	      kunitconfig_paths: Optional[List[str]]=None,
240	      kconfig_add: Optional[List[str]]=None,
241	      arch: Optional[str]=None,
242	      cross_compile: Optional[str]=None,
243	      qemu_config_path: Optional[str]=None,
244	      extra_qemu_args: Optional[List[str]]=None) -> None:
245		signal.signal(signal.SIGINT, self.signal_handler)
246		if qemu_config_path:
247			self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
248		else:
249			self._arch = 'um' if arch is None else arch
250			if self._arch == 'um':
251				self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile)
252			else:
253				qemu_config_path = _default_qemu_config_path(self._arch)
254				_, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
255
256		self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths)
257		if kconfig_add:
258			kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add))
259			self._kconfig.merge_in_entries(kconfig)
260
261	def arch(self) -> str:
262		return self._arch
263
264	def clean(self) -> bool:
265		try:
266			self._ops.make_mrproper()
267		except ConfigError as e:
268			logging.error(e)
269			return False
270		return True
271
272	def validate_config(self, build_dir: str) -> bool:
273		kconfig_path = get_kconfig_path(build_dir)
274		validated_kconfig = kunit_config.parse_file(kconfig_path)
275		if self._kconfig.is_subset_of(validated_kconfig):
276			return True
277		missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries())
278		message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \
279			  'This is probably due to unsatisfied dependencies.\n' \
280			  'Missing: ' + ', '.join(str(e) for e in missing)
281		if self._arch == 'um':
282			message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \
283				   'on a different architecture with something like "--arch=x86_64".'
284		logging.error(message)
285		return False
286
287	def build_config(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
288		kconfig_path = get_kconfig_path(build_dir)
289		if build_dir and not os.path.exists(build_dir):
290			os.mkdir(build_dir)
291		try:
292			self._kconfig = self._ops.make_arch_config(self._kconfig)
293			self._kconfig.write_to_file(kconfig_path)
294			self._ops.make_olddefconfig(build_dir, make_options)
295		except ConfigError as e:
296			logging.error(e)
297			return False
298		if not self.validate_config(build_dir):
299			return False
300
301		old_path = get_old_kunitconfig_path(build_dir)
302		if os.path.exists(old_path):
303			os.remove(old_path)  # write_to_file appends to the file
304		self._kconfig.write_to_file(old_path)
305		return True
306
307	def _kunitconfig_changed(self, build_dir: str) -> bool:
308		old_path = get_old_kunitconfig_path(build_dir)
309		if not os.path.exists(old_path):
310			return True
311
312		old_kconfig = kunit_config.parse_file(old_path)
313		return old_kconfig != self._kconfig
314
315	def build_reconfig(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
316		"""Creates a new .config if it is not a subset of the .kunitconfig."""
317		kconfig_path = get_kconfig_path(build_dir)
318		if not os.path.exists(kconfig_path):
319			print('Generating .config ...')
320			return self.build_config(build_dir, make_options)
321
322		existing_kconfig = kunit_config.parse_file(kconfig_path)
323		self._kconfig = self._ops.make_arch_config(self._kconfig)
324
325		if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir):
326			return True
327		print('Regenerating .config ...')
328		os.remove(kconfig_path)
329		return self.build_config(build_dir, make_options)
330
331	def build_kernel(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> bool:
332		try:
333			self._ops.make_olddefconfig(build_dir, make_options)
334			self._ops.make(jobs, build_dir, make_options)
335		except (ConfigError, BuildError) as e:
336			logging.error(e)
337			return False
338		return self.validate_config(build_dir)
339
340	def run_kernel(self, args: Optional[List[str]]=None, build_dir: str='', filter_glob: str='', filter: str='', filter_action: Optional[str]=None, timeout: Optional[int]=None) -> Iterator[str]:
341		if not args:
342			args = []
343		if filter_glob:
344			args.append('kunit.filter_glob=' + filter_glob)
345		if filter:
346			args.append('kunit.filter="' + filter + '"')
347		if filter_action:
348			args.append('kunit.filter_action=' + filter_action)
349		args.append('kunit.enable=1')
350
351		process = self._ops.start(args, build_dir)
352		assert process.stdout is not None  # tell mypy it's set
353
354		# Enforce the timeout in a background thread.
355		def _wait_proc() -> None:
356			try:
357				process.wait(timeout=timeout)
358			except Exception as e:
359				print(e)
360				process.terminate()
361				process.wait()
362		waiter = threading.Thread(target=_wait_proc)
363		waiter.start()
364
365		output = open(get_outfile_path(build_dir), 'w')
366		try:
367			# Tee the output to the file and to our caller in real time.
368			for line in process.stdout:
369				output.write(line)
370				yield line
371		# This runs even if our caller doesn't consume every line.
372		finally:
373			# Flush any leftover output to the file
374			output.write(process.stdout.read())
375			output.close()
376			process.stdout.close()
377
378			waiter.join()
379			subprocess.call(['stty', 'sane'])
380
381	def signal_handler(self, unused_sig: int, unused_frame: Optional[FrameType]) -> None:
382		logging.error('Build interruption occurred. Cleaning console.')
383		subprocess.call(['stty', 'sane'])
384