1# SPDX-License-Identifier: GPL-2.0 2# 3# Runs UML kernel, collects output, and handles errors. 4# 5# Copyright (C) 2019, Google LLC. 6# Author: Felix Guo <felixguoxiuping@gmail.com> 7# Author: Brendan Higgins <brendanhiggins@google.com> 8 9import importlib.abc 10import importlib.util 11import logging 12import subprocess 13import os 14import shlex 15import shutil 16import signal 17import threading 18from typing import Iterator, List, Optional, Tuple 19from types import FrameType 20 21import kunit_config 22import qemu_config 23 24KCONFIG_PATH = '.config' 25KUNITCONFIG_PATH = '.kunitconfig' 26OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig' 27DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config' 28ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config' 29UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config' 30OUTFILE_PATH = 'test.log' 31ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__)) 32QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs') 33 34class ConfigError(Exception): 35 """Represents an error trying to configure the Linux kernel.""" 36 37 38class BuildError(Exception): 39 """Represents an error trying to build the Linux kernel.""" 40 41 42class LinuxSourceTreeOperations: 43 """An abstraction over command line operations performed on a source tree.""" 44 45 def __init__(self, linux_arch: str, cross_compile: Optional[str]): 46 self._linux_arch = linux_arch 47 self._cross_compile = cross_compile 48 49 def make_mrproper(self) -> None: 50 try: 51 subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT) 52 except OSError as e: 53 raise ConfigError('Could not call make command: ' + str(e)) 54 except subprocess.CalledProcessError as e: 55 raise ConfigError(e.output.decode()) 56 57 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig: 58 return base_kunitconfig 59 60 def make_olddefconfig(self, build_dir: str, make_options: Optional[List[str]]) -> None: 61 command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig'] 62 if self._cross_compile: 63 command += ['CROSS_COMPILE=' + self._cross_compile] 64 if make_options: 65 command.extend(make_options) 66 print('Populating config with:\n$', ' '.join(command)) 67 try: 68 subprocess.check_output(command, stderr=subprocess.STDOUT) 69 except OSError as e: 70 raise ConfigError('Could not call make command: ' + str(e)) 71 except subprocess.CalledProcessError as e: 72 raise ConfigError(e.output.decode()) 73 74 def make(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> None: 75 command = ['make', 'all', 'compile_commands.json', 'ARCH=' + self._linux_arch, 76 'O=' + build_dir, '--jobs=' + str(jobs)] 77 if make_options: 78 command.extend(make_options) 79 if self._cross_compile: 80 command += ['CROSS_COMPILE=' + self._cross_compile] 81 print('Building with:\n$', ' '.join(command)) 82 try: 83 proc = subprocess.Popen(command, 84 stderr=subprocess.PIPE, 85 stdout=subprocess.DEVNULL) 86 except OSError as e: 87 raise BuildError('Could not call execute make: ' + str(e)) 88 except subprocess.CalledProcessError as e: 89 raise BuildError(e.output) 90 _, stderr = proc.communicate() 91 if proc.returncode != 0: 92 raise BuildError(stderr.decode()) 93 if stderr: # likely only due to build warnings 94 print(stderr.decode()) 95 96 def start(self, params: List[str], build_dir: str) -> subprocess.Popen: 97 raise RuntimeError('not implemented!') 98 99 100class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations): 101 102 def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]): 103 super().__init__(linux_arch=qemu_arch_params.linux_arch, 104 cross_compile=cross_compile) 105 self._kconfig = qemu_arch_params.kconfig 106 self._qemu_arch = qemu_arch_params.qemu_arch 107 self._kernel_path = qemu_arch_params.kernel_path 108 self._kernel_command_line = qemu_arch_params.kernel_command_line 109 if 'kunit_shutdown=' not in self._kernel_command_line: 110 self._kernel_command_line += ' kunit_shutdown=reboot' 111 self._extra_qemu_params = qemu_arch_params.extra_qemu_params 112 self._serial = qemu_arch_params.serial 113 114 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig: 115 kconfig = kunit_config.parse_from_string(self._kconfig) 116 kconfig.merge_in_entries(base_kunitconfig) 117 return kconfig 118 119 def start(self, params: List[str], build_dir: str) -> subprocess.Popen: 120 kernel_path = os.path.join(build_dir, self._kernel_path) 121 qemu_command = ['qemu-system-' + self._qemu_arch, 122 '-nodefaults', 123 '-m', '1024', 124 '-kernel', kernel_path, 125 '-append', ' '.join(params + [self._kernel_command_line]), 126 '-no-reboot', 127 '-nographic', 128 '-serial', self._serial] + self._extra_qemu_params 129 # Note: shlex.join() does what we want, but requires python 3.8+. 130 print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command)) 131 return subprocess.Popen(qemu_command, 132 stdin=subprocess.PIPE, 133 stdout=subprocess.PIPE, 134 stderr=subprocess.STDOUT, 135 text=True, errors='backslashreplace') 136 137class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations): 138 """An abstraction over command line operations performed on a source tree.""" 139 140 def __init__(self, cross_compile: Optional[str]=None): 141 super().__init__(linux_arch='um', cross_compile=cross_compile) 142 143 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig: 144 kconfig = kunit_config.parse_file(UML_KCONFIG_PATH) 145 kconfig.merge_in_entries(base_kunitconfig) 146 return kconfig 147 148 def start(self, params: List[str], build_dir: str) -> subprocess.Popen: 149 """Runs the Linux UML binary. Must be named 'linux'.""" 150 linux_bin = os.path.join(build_dir, 'linux') 151 params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt']) 152 print('Running tests with:\n$', linux_bin, ' '.join(shlex.quote(arg) for arg in params)) 153 return subprocess.Popen([linux_bin] + params, 154 stdin=subprocess.PIPE, 155 stdout=subprocess.PIPE, 156 stderr=subprocess.STDOUT, 157 text=True, errors='backslashreplace') 158 159def get_kconfig_path(build_dir: str) -> str: 160 return os.path.join(build_dir, KCONFIG_PATH) 161 162def get_kunitconfig_path(build_dir: str) -> str: 163 return os.path.join(build_dir, KUNITCONFIG_PATH) 164 165def get_old_kunitconfig_path(build_dir: str) -> str: 166 return os.path.join(build_dir, OLD_KUNITCONFIG_PATH) 167 168def get_parsed_kunitconfig(build_dir: str, 169 kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig: 170 if not kunitconfig_paths: 171 path = get_kunitconfig_path(build_dir) 172 if not os.path.exists(path): 173 shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path) 174 return kunit_config.parse_file(path) 175 176 merged = kunit_config.Kconfig() 177 178 for path in kunitconfig_paths: 179 if os.path.isdir(path): 180 path = os.path.join(path, KUNITCONFIG_PATH) 181 if not os.path.exists(path): 182 raise ConfigError(f'Specified kunitconfig ({path}) does not exist') 183 184 partial = kunit_config.parse_file(path) 185 diff = merged.conflicting_options(partial) 186 if diff: 187 diff_str = '\n\n'.join(f'{a}\n vs from {path}\n{b}' for a, b in diff) 188 raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}') 189 merged.merge_in_entries(partial) 190 return merged 191 192def get_outfile_path(build_dir: str) -> str: 193 return os.path.join(build_dir, OUTFILE_PATH) 194 195def _default_qemu_config_path(arch: str) -> str: 196 config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py') 197 if os.path.isfile(config_path): 198 return config_path 199 200 options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')] 201 raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options))) 202 203def _get_qemu_ops(config_path: str, 204 extra_qemu_args: Optional[List[str]], 205 cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]: 206 # The module name/path has very little to do with where the actual file 207 # exists (I learned this through experimentation and could not find it 208 # anywhere in the Python documentation). 209 # 210 # Bascially, we completely ignore the actual file location of the config 211 # we are loading and just tell Python that the module lives in the 212 # QEMU_CONFIGS_DIR for import purposes regardless of where it actually 213 # exists as a file. 214 module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path)) 215 spec = importlib.util.spec_from_file_location(module_path, config_path) 216 assert spec is not None 217 config = importlib.util.module_from_spec(spec) 218 # See https://github.com/python/typeshed/pull/2626 for context. 219 assert isinstance(spec.loader, importlib.abc.Loader) 220 spec.loader.exec_module(config) 221 222 if not hasattr(config, 'QEMU_ARCH'): 223 raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path) 224 params: qemu_config.QemuArchParams = config.QEMU_ARCH 225 if extra_qemu_args: 226 params.extra_qemu_params.extend(extra_qemu_args) 227 return params.linux_arch, LinuxSourceTreeOperationsQemu( 228 params, cross_compile=cross_compile) 229 230class LinuxSourceTree: 231 """Represents a Linux kernel source tree with KUnit tests.""" 232 233 def __init__( 234 self, 235 build_dir: str, 236 kunitconfig_paths: Optional[List[str]]=None, 237 kconfig_add: Optional[List[str]]=None, 238 arch: Optional[str]=None, 239 cross_compile: Optional[str]=None, 240 qemu_config_path: Optional[str]=None, 241 extra_qemu_args: Optional[List[str]]=None) -> None: 242 signal.signal(signal.SIGINT, self.signal_handler) 243 if qemu_config_path: 244 self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile) 245 else: 246 self._arch = 'um' if arch is None else arch 247 if self._arch == 'um': 248 self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile) 249 else: 250 qemu_config_path = _default_qemu_config_path(self._arch) 251 _, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile) 252 253 self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths) 254 if kconfig_add: 255 kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add)) 256 self._kconfig.merge_in_entries(kconfig) 257 258 def arch(self) -> str: 259 return self._arch 260 261 def clean(self) -> bool: 262 try: 263 self._ops.make_mrproper() 264 except ConfigError as e: 265 logging.error(e) 266 return False 267 return True 268 269 def validate_config(self, build_dir: str) -> bool: 270 kconfig_path = get_kconfig_path(build_dir) 271 validated_kconfig = kunit_config.parse_file(kconfig_path) 272 if self._kconfig.is_subset_of(validated_kconfig): 273 return True 274 missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries()) 275 message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \ 276 'This is probably due to unsatisfied dependencies.\n' \ 277 'Missing: ' + ', '.join(str(e) for e in missing) 278 if self._arch == 'um': 279 message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \ 280 'on a different architecture with something like "--arch=x86_64".' 281 logging.error(message) 282 return False 283 284 def build_config(self, build_dir: str, make_options: Optional[List[str]]) -> bool: 285 kconfig_path = get_kconfig_path(build_dir) 286 if build_dir and not os.path.exists(build_dir): 287 os.mkdir(build_dir) 288 try: 289 self._kconfig = self._ops.make_arch_config(self._kconfig) 290 self._kconfig.write_to_file(kconfig_path) 291 self._ops.make_olddefconfig(build_dir, make_options) 292 except ConfigError as e: 293 logging.error(e) 294 return False 295 if not self.validate_config(build_dir): 296 return False 297 298 old_path = get_old_kunitconfig_path(build_dir) 299 if os.path.exists(old_path): 300 os.remove(old_path) # write_to_file appends to the file 301 self._kconfig.write_to_file(old_path) 302 return True 303 304 def _kunitconfig_changed(self, build_dir: str) -> bool: 305 old_path = get_old_kunitconfig_path(build_dir) 306 if not os.path.exists(old_path): 307 return True 308 309 old_kconfig = kunit_config.parse_file(old_path) 310 return old_kconfig != self._kconfig 311 312 def build_reconfig(self, build_dir: str, make_options: Optional[List[str]]) -> bool: 313 """Creates a new .config if it is not a subset of the .kunitconfig.""" 314 kconfig_path = get_kconfig_path(build_dir) 315 if not os.path.exists(kconfig_path): 316 print('Generating .config ...') 317 return self.build_config(build_dir, make_options) 318 319 existing_kconfig = kunit_config.parse_file(kconfig_path) 320 self._kconfig = self._ops.make_arch_config(self._kconfig) 321 322 if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir): 323 return True 324 print('Regenerating .config ...') 325 os.remove(kconfig_path) 326 return self.build_config(build_dir, make_options) 327 328 def build_kernel(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> bool: 329 try: 330 self._ops.make_olddefconfig(build_dir, make_options) 331 self._ops.make(jobs, build_dir, make_options) 332 except (ConfigError, BuildError) as e: 333 logging.error(e) 334 return False 335 return self.validate_config(build_dir) 336 337 def run_kernel(self, args: Optional[List[str]]=None, build_dir: str='', filter_glob: str='', filter: str='', filter_action: Optional[str]=None, timeout: Optional[int]=None) -> Iterator[str]: 338 if not args: 339 args = [] 340 if filter_glob: 341 args.append('kunit.filter_glob=' + filter_glob) 342 if filter: 343 args.append('kunit.filter="' + filter + '"') 344 if filter_action: 345 args.append('kunit.filter_action=' + filter_action) 346 args.append('kunit.enable=1') 347 348 process = self._ops.start(args, build_dir) 349 assert process.stdout is not None # tell mypy it's set 350 351 # Enforce the timeout in a background thread. 352 def _wait_proc() -> None: 353 try: 354 process.wait(timeout=timeout) 355 except Exception as e: 356 print(e) 357 process.terminate() 358 process.wait() 359 waiter = threading.Thread(target=_wait_proc) 360 waiter.start() 361 362 output = open(get_outfile_path(build_dir), 'w') 363 try: 364 # Tee the output to the file and to our caller in real time. 365 for line in process.stdout: 366 output.write(line) 367 yield line 368 # This runs even if our caller doesn't consume every line. 369 finally: 370 # Flush any leftover output to the file 371 output.write(process.stdout.read()) 372 output.close() 373 process.stdout.close() 374 375 waiter.join() 376 subprocess.call(['stty', 'sane']) 377 378 def signal_handler(self, unused_sig: int, unused_frame: Optional[FrameType]) -> None: 379 logging.error('Build interruption occurred. Cleaning console.') 380 subprocess.call(['stty', 'sane']) 381