xref: /linux/tools/testing/kunit/kunit_kernel.py (revision 7f4f3b14e8079ecde096bd734af10e30d40c27b7)
1# SPDX-License-Identifier: GPL-2.0
2#
3# Runs UML kernel, collects output, and handles errors.
4#
5# Copyright (C) 2019, Google LLC.
6# Author: Felix Guo <felixguoxiuping@gmail.com>
7# Author: Brendan Higgins <brendanhiggins@google.com>
8
9import importlib.abc
10import importlib.util
11import logging
12import subprocess
13import os
14import shlex
15import shutil
16import signal
17import threading
18from typing import Iterator, List, Optional, Tuple
19from types import FrameType
20
21import kunit_config
22import qemu_config
23
24KCONFIG_PATH = '.config'
25KUNITCONFIG_PATH = '.kunitconfig'
26OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig'
27DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config'
28ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config'
29UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config'
30OUTFILE_PATH = 'test.log'
31ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__))
32QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs')
33
34class ConfigError(Exception):
35	"""Represents an error trying to configure the Linux kernel."""
36
37
38class BuildError(Exception):
39	"""Represents an error trying to build the Linux kernel."""
40
41
42class LinuxSourceTreeOperations:
43	"""An abstraction over command line operations performed on a source tree."""
44
45	def __init__(self, linux_arch: str, cross_compile: Optional[str]):
46		self._linux_arch = linux_arch
47		self._cross_compile = cross_compile
48
49	def make_mrproper(self) -> None:
50		try:
51			subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT)
52		except OSError as e:
53			raise ConfigError('Could not call make command: ' + str(e))
54		except subprocess.CalledProcessError as e:
55			raise ConfigError(e.output.decode())
56
57	def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
58		return base_kunitconfig
59
60	def make_olddefconfig(self, build_dir: str, make_options: Optional[List[str]]) -> None:
61		command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig']
62		if self._cross_compile:
63			command += ['CROSS_COMPILE=' + self._cross_compile]
64		if make_options:
65			command.extend(make_options)
66		print('Populating config with:\n$', ' '.join(command))
67		try:
68			subprocess.check_output(command, stderr=subprocess.STDOUT)
69		except OSError as e:
70			raise ConfigError('Could not call make command: ' + str(e))
71		except subprocess.CalledProcessError as e:
72			raise ConfigError(e.output.decode())
73
74	def make(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> None:
75		command = ['make', 'all', 'compile_commands.json', 'ARCH=' + self._linux_arch,
76			   'O=' + build_dir, '--jobs=' + str(jobs)]
77		if make_options:
78			command.extend(make_options)
79		if self._cross_compile:
80			command += ['CROSS_COMPILE=' + self._cross_compile]
81		print('Building with:\n$', ' '.join(command))
82		try:
83			proc = subprocess.Popen(command,
84						stderr=subprocess.PIPE,
85						stdout=subprocess.DEVNULL)
86		except OSError as e:
87			raise BuildError('Could not call execute make: ' + str(e))
88		except subprocess.CalledProcessError as e:
89			raise BuildError(e.output)
90		_, stderr = proc.communicate()
91		if proc.returncode != 0:
92			raise BuildError(stderr.decode())
93		if stderr:  # likely only due to build warnings
94			print(stderr.decode())
95
96	def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
97		raise RuntimeError('not implemented!')
98
99
100class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations):
101
102	def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]):
103		super().__init__(linux_arch=qemu_arch_params.linux_arch,
104				 cross_compile=cross_compile)
105		self._kconfig = qemu_arch_params.kconfig
106		self._qemu_arch = qemu_arch_params.qemu_arch
107		self._kernel_path = qemu_arch_params.kernel_path
108		self._kernel_command_line = qemu_arch_params.kernel_command_line
109		if 'kunit_shutdown=' not in self._kernel_command_line:
110			self._kernel_command_line += ' kunit_shutdown=reboot'
111		self._extra_qemu_params = qemu_arch_params.extra_qemu_params
112		self._serial = qemu_arch_params.serial
113
114	def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
115		kconfig = kunit_config.parse_from_string(self._kconfig)
116		kconfig.merge_in_entries(base_kunitconfig)
117		return kconfig
118
119	def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
120		kernel_path = os.path.join(build_dir, self._kernel_path)
121		qemu_command = ['qemu-system-' + self._qemu_arch,
122				'-nodefaults',
123				'-m', '1024',
124				'-kernel', kernel_path,
125				'-append', ' '.join(params + [self._kernel_command_line]),
126				'-no-reboot',
127				'-nographic',
128				'-serial', self._serial] + self._extra_qemu_params
129		# Note: shlex.join() does what we want, but requires python 3.8+.
130		print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command))
131		return subprocess.Popen(qemu_command,
132					stdin=subprocess.PIPE,
133					stdout=subprocess.PIPE,
134					stderr=subprocess.STDOUT,
135					text=True, errors='backslashreplace')
136
137class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations):
138	"""An abstraction over command line operations performed on a source tree."""
139
140	def __init__(self, cross_compile: Optional[str]=None):
141		super().__init__(linux_arch='um', cross_compile=cross_compile)
142
143	def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
144		kconfig = kunit_config.parse_file(UML_KCONFIG_PATH)
145		kconfig.merge_in_entries(base_kunitconfig)
146		return kconfig
147
148	def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
149		"""Runs the Linux UML binary. Must be named 'linux'."""
150		linux_bin = os.path.join(build_dir, 'linux')
151		params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt'])
152		print('Running tests with:\n$', linux_bin, ' '.join(shlex.quote(arg) for arg in params))
153		return subprocess.Popen([linux_bin] + params,
154					   stdin=subprocess.PIPE,
155					   stdout=subprocess.PIPE,
156					   stderr=subprocess.STDOUT,
157					   text=True, errors='backslashreplace')
158
159def get_kconfig_path(build_dir: str) -> str:
160	return os.path.join(build_dir, KCONFIG_PATH)
161
162def get_kunitconfig_path(build_dir: str) -> str:
163	return os.path.join(build_dir, KUNITCONFIG_PATH)
164
165def get_old_kunitconfig_path(build_dir: str) -> str:
166	return os.path.join(build_dir, OLD_KUNITCONFIG_PATH)
167
168def get_parsed_kunitconfig(build_dir: str,
169			   kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig:
170	if not kunitconfig_paths:
171		path = get_kunitconfig_path(build_dir)
172		if not os.path.exists(path):
173			shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path)
174		return kunit_config.parse_file(path)
175
176	merged = kunit_config.Kconfig()
177
178	for path in kunitconfig_paths:
179		if os.path.isdir(path):
180			path = os.path.join(path, KUNITCONFIG_PATH)
181		if not os.path.exists(path):
182			raise ConfigError(f'Specified kunitconfig ({path}) does not exist')
183
184		partial = kunit_config.parse_file(path)
185		diff = merged.conflicting_options(partial)
186		if diff:
187			diff_str = '\n\n'.join(f'{a}\n  vs from {path}\n{b}' for a, b in diff)
188			raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}')
189		merged.merge_in_entries(partial)
190	return merged
191
192def get_outfile_path(build_dir: str) -> str:
193	return os.path.join(build_dir, OUTFILE_PATH)
194
195def _default_qemu_config_path(arch: str) -> str:
196	config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py')
197	if os.path.isfile(config_path):
198		return config_path
199
200	options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')]
201	raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options)))
202
203def _get_qemu_ops(config_path: str,
204		  extra_qemu_args: Optional[List[str]],
205		  cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]:
206	# The module name/path has very little to do with where the actual file
207	# exists (I learned this through experimentation and could not find it
208	# anywhere in the Python documentation).
209	#
210	# Bascially, we completely ignore the actual file location of the config
211	# we are loading and just tell Python that the module lives in the
212	# QEMU_CONFIGS_DIR for import purposes regardless of where it actually
213	# exists as a file.
214	module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path))
215	spec = importlib.util.spec_from_file_location(module_path, config_path)
216	assert spec is not None
217	config = importlib.util.module_from_spec(spec)
218	# See https://github.com/python/typeshed/pull/2626 for context.
219	assert isinstance(spec.loader, importlib.abc.Loader)
220	spec.loader.exec_module(config)
221
222	if not hasattr(config, 'QEMU_ARCH'):
223		raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path)
224	params: qemu_config.QemuArchParams = config.QEMU_ARCH
225	if extra_qemu_args:
226		params.extra_qemu_params.extend(extra_qemu_args)
227	return params.linux_arch, LinuxSourceTreeOperationsQemu(
228			params, cross_compile=cross_compile)
229
230class LinuxSourceTree:
231	"""Represents a Linux kernel source tree with KUnit tests."""
232
233	def __init__(
234	      self,
235	      build_dir: str,
236	      kunitconfig_paths: Optional[List[str]]=None,
237	      kconfig_add: Optional[List[str]]=None,
238	      arch: Optional[str]=None,
239	      cross_compile: Optional[str]=None,
240	      qemu_config_path: Optional[str]=None,
241	      extra_qemu_args: Optional[List[str]]=None) -> None:
242		signal.signal(signal.SIGINT, self.signal_handler)
243		if qemu_config_path:
244			self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
245		else:
246			self._arch = 'um' if arch is None else arch
247			if self._arch == 'um':
248				self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile)
249			else:
250				qemu_config_path = _default_qemu_config_path(self._arch)
251				_, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
252
253		self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths)
254		if kconfig_add:
255			kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add))
256			self._kconfig.merge_in_entries(kconfig)
257
258	def arch(self) -> str:
259		return self._arch
260
261	def clean(self) -> bool:
262		try:
263			self._ops.make_mrproper()
264		except ConfigError as e:
265			logging.error(e)
266			return False
267		return True
268
269	def validate_config(self, build_dir: str) -> bool:
270		kconfig_path = get_kconfig_path(build_dir)
271		validated_kconfig = kunit_config.parse_file(kconfig_path)
272		if self._kconfig.is_subset_of(validated_kconfig):
273			return True
274		missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries())
275		message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \
276			  'This is probably due to unsatisfied dependencies.\n' \
277			  'Missing: ' + ', '.join(str(e) for e in missing)
278		if self._arch == 'um':
279			message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \
280				   'on a different architecture with something like "--arch=x86_64".'
281		logging.error(message)
282		return False
283
284	def build_config(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
285		kconfig_path = get_kconfig_path(build_dir)
286		if build_dir and not os.path.exists(build_dir):
287			os.mkdir(build_dir)
288		try:
289			self._kconfig = self._ops.make_arch_config(self._kconfig)
290			self._kconfig.write_to_file(kconfig_path)
291			self._ops.make_olddefconfig(build_dir, make_options)
292		except ConfigError as e:
293			logging.error(e)
294			return False
295		if not self.validate_config(build_dir):
296			return False
297
298		old_path = get_old_kunitconfig_path(build_dir)
299		if os.path.exists(old_path):
300			os.remove(old_path)  # write_to_file appends to the file
301		self._kconfig.write_to_file(old_path)
302		return True
303
304	def _kunitconfig_changed(self, build_dir: str) -> bool:
305		old_path = get_old_kunitconfig_path(build_dir)
306		if not os.path.exists(old_path):
307			return True
308
309		old_kconfig = kunit_config.parse_file(old_path)
310		return old_kconfig != self._kconfig
311
312	def build_reconfig(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
313		"""Creates a new .config if it is not a subset of the .kunitconfig."""
314		kconfig_path = get_kconfig_path(build_dir)
315		if not os.path.exists(kconfig_path):
316			print('Generating .config ...')
317			return self.build_config(build_dir, make_options)
318
319		existing_kconfig = kunit_config.parse_file(kconfig_path)
320		self._kconfig = self._ops.make_arch_config(self._kconfig)
321
322		if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir):
323			return True
324		print('Regenerating .config ...')
325		os.remove(kconfig_path)
326		return self.build_config(build_dir, make_options)
327
328	def build_kernel(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> bool:
329		try:
330			self._ops.make_olddefconfig(build_dir, make_options)
331			self._ops.make(jobs, build_dir, make_options)
332		except (ConfigError, BuildError) as e:
333			logging.error(e)
334			return False
335		return self.validate_config(build_dir)
336
337	def run_kernel(self, args: Optional[List[str]]=None, build_dir: str='', filter_glob: str='', filter: str='', filter_action: Optional[str]=None, timeout: Optional[int]=None) -> Iterator[str]:
338		if not args:
339			args = []
340		if filter_glob:
341			args.append('kunit.filter_glob=' + filter_glob)
342		if filter:
343			args.append('kunit.filter="' + filter + '"')
344		if filter_action:
345			args.append('kunit.filter_action=' + filter_action)
346		args.append('kunit.enable=1')
347
348		process = self._ops.start(args, build_dir)
349		assert process.stdout is not None  # tell mypy it's set
350
351		# Enforce the timeout in a background thread.
352		def _wait_proc() -> None:
353			try:
354				process.wait(timeout=timeout)
355			except Exception as e:
356				print(e)
357				process.terminate()
358				process.wait()
359		waiter = threading.Thread(target=_wait_proc)
360		waiter.start()
361
362		output = open(get_outfile_path(build_dir), 'w')
363		try:
364			# Tee the output to the file and to our caller in real time.
365			for line in process.stdout:
366				output.write(line)
367				yield line
368		# This runs even if our caller doesn't consume every line.
369		finally:
370			# Flush any leftover output to the file
371			output.write(process.stdout.read())
372			output.close()
373			process.stdout.close()
374
375			waiter.join()
376			subprocess.call(['stty', 'sane'])
377
378	def signal_handler(self, unused_sig: int, unused_frame: Optional[FrameType]) -> None:
379		logging.error('Build interruption occurred. Cleaning console.')
380		subprocess.call(['stty', 'sane'])
381