1# SPDX-License-Identifier: GPL-2.0 2# 3# Runs UML kernel, collects output, and handles errors. 4# 5# Copyright (C) 2019, Google LLC. 6# Author: Felix Guo <felixguoxiuping@gmail.com> 7# Author: Brendan Higgins <brendanhiggins@google.com> 8 9import importlib.abc 10import importlib.util 11import logging 12import subprocess 13import os 14import shlex 15import shutil 16import signal 17import threading 18from typing import Iterator, List, Optional, Tuple 19from types import FrameType 20 21import kunit_config 22import qemu_config 23 24KCONFIG_PATH = '.config' 25KUNITCONFIG_PATH = '.kunitconfig' 26OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig' 27DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config' 28ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config' 29UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config' 30OUTFILE_PATH = 'test.log' 31ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__)) 32QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs') 33 34class ConfigError(Exception): 35 """Represents an error trying to configure the Linux kernel.""" 36 37 38class BuildError(Exception): 39 """Represents an error trying to build the Linux kernel.""" 40 41 42class LinuxSourceTreeOperations: 43 """An abstraction over command line operations performed on a source tree.""" 44 45 def __init__(self, linux_arch: str, cross_compile: Optional[str]): 46 self._linux_arch = linux_arch 47 self._cross_compile = cross_compile 48 49 def make_mrproper(self) -> None: 50 try: 51 subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT) 52 except OSError as e: 53 raise ConfigError('Could not call make command: ' + str(e)) 54 except subprocess.CalledProcessError as e: 55 raise ConfigError(e.output.decode()) 56 57 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig: 58 return base_kunitconfig 59 60 def make_olddefconfig(self, build_dir: str, make_options: Optional[List[str]]) -> None: 61 command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig'] 62 if self._cross_compile: 63 command += ['CROSS_COMPILE=' + self._cross_compile] 64 if make_options: 65 command.extend(make_options) 66 print('Populating config with:\n$', ' '.join(command)) 67 try: 68 subprocess.check_output(command, stderr=subprocess.STDOUT) 69 except OSError as e: 70 raise ConfigError('Could not call make command: ' + str(e)) 71 except subprocess.CalledProcessError as e: 72 raise ConfigError(e.output.decode()) 73 74 def make(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> None: 75 command = ['make', 'all', 'compile_commands.json', 'scripts_gdb', 76 'ARCH=' + self._linux_arch, 'O=' + build_dir, '--jobs=' + str(jobs)] 77 if make_options: 78 command.extend(make_options) 79 if self._cross_compile: 80 command += ['CROSS_COMPILE=' + self._cross_compile] 81 print('Building with:\n$', ' '.join(command)) 82 try: 83 proc = subprocess.Popen(command, 84 stderr=subprocess.PIPE, 85 stdout=subprocess.DEVNULL) 86 except OSError as e: 87 raise BuildError('Could not call execute make: ' + str(e)) 88 except subprocess.CalledProcessError as e: 89 raise BuildError(e.output) 90 _, stderr = proc.communicate() 91 if proc.returncode != 0: 92 raise BuildError(stderr.decode()) 93 if stderr: # likely only due to build warnings 94 print(stderr.decode()) 95 96 def start(self, params: List[str], build_dir: str) -> subprocess.Popen: 97 raise RuntimeError('not implemented!') 98 99 100class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations): 101 102 def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]): 103 super().__init__(linux_arch=qemu_arch_params.linux_arch, 104 cross_compile=cross_compile) 105 self._kconfig = qemu_arch_params.kconfig 106 self._qemu_arch = qemu_arch_params.qemu_arch 107 self._kernel_path = qemu_arch_params.kernel_path 108 self._kernel_command_line = qemu_arch_params.kernel_command_line 109 if 'kunit_shutdown=' not in self._kernel_command_line: 110 self._kernel_command_line += ' kunit_shutdown=reboot' 111 self._extra_qemu_params = qemu_arch_params.extra_qemu_params 112 self._serial = qemu_arch_params.serial 113 114 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig: 115 kconfig = kunit_config.parse_from_string(self._kconfig) 116 kconfig.merge_in_entries(base_kunitconfig) 117 return kconfig 118 119 def start(self, params: List[str], build_dir: str) -> subprocess.Popen: 120 kernel_path = os.path.join(build_dir, self._kernel_path) 121 qemu_command = ['qemu-system-' + self._qemu_arch, 122 '-nodefaults', 123 '-m', '1024', 124 '-kernel', kernel_path, 125 '-append', ' '.join(params + [self._kernel_command_line]), 126 '-no-reboot', 127 '-nographic', 128 '-accel', 'kvm', 129 '-accel', 'hvf', 130 '-accel', 'tcg', 131 '-serial', self._serial] + self._extra_qemu_params 132 # Note: shlex.join() does what we want, but requires python 3.8+. 133 print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command)) 134 return subprocess.Popen(qemu_command, 135 stdin=subprocess.PIPE, 136 stdout=subprocess.PIPE, 137 stderr=subprocess.STDOUT, 138 text=True, errors='backslashreplace') 139 140class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations): 141 """An abstraction over command line operations performed on a source tree.""" 142 143 def __init__(self, cross_compile: Optional[str]=None): 144 super().__init__(linux_arch='um', cross_compile=cross_compile) 145 146 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig: 147 kconfig = kunit_config.parse_file(UML_KCONFIG_PATH) 148 kconfig.merge_in_entries(base_kunitconfig) 149 return kconfig 150 151 def start(self, params: List[str], build_dir: str) -> subprocess.Popen: 152 """Runs the Linux UML binary. Must be named 'linux'.""" 153 linux_bin = os.path.join(build_dir, 'linux') 154 params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt']) 155 print('Running tests with:\n$', linux_bin, ' '.join(shlex.quote(arg) for arg in params)) 156 return subprocess.Popen([linux_bin] + params, 157 stdin=subprocess.PIPE, 158 stdout=subprocess.PIPE, 159 stderr=subprocess.STDOUT, 160 text=True, errors='backslashreplace') 161 162def get_kconfig_path(build_dir: str) -> str: 163 return os.path.join(build_dir, KCONFIG_PATH) 164 165def get_kunitconfig_path(build_dir: str) -> str: 166 return os.path.join(build_dir, KUNITCONFIG_PATH) 167 168def get_old_kunitconfig_path(build_dir: str) -> str: 169 return os.path.join(build_dir, OLD_KUNITCONFIG_PATH) 170 171def get_parsed_kunitconfig(build_dir: str, 172 kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig: 173 if not kunitconfig_paths: 174 path = get_kunitconfig_path(build_dir) 175 if not os.path.exists(path): 176 shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path) 177 return kunit_config.parse_file(path) 178 179 merged = kunit_config.Kconfig() 180 181 for path in kunitconfig_paths: 182 if os.path.isdir(path): 183 path = os.path.join(path, KUNITCONFIG_PATH) 184 if not os.path.exists(path): 185 raise ConfigError(f'Specified kunitconfig ({path}) does not exist') 186 187 partial = kunit_config.parse_file(path) 188 diff = merged.conflicting_options(partial) 189 if diff: 190 diff_str = '\n\n'.join(f'{a}\n vs from {path}\n{b}' for a, b in diff) 191 raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}') 192 merged.merge_in_entries(partial) 193 return merged 194 195def get_outfile_path(build_dir: str) -> str: 196 return os.path.join(build_dir, OUTFILE_PATH) 197 198def _default_qemu_config_path(arch: str) -> str: 199 config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py') 200 if os.path.isfile(config_path): 201 return config_path 202 203 options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')] 204 raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options))) 205 206def _get_qemu_ops(config_path: str, 207 extra_qemu_args: Optional[List[str]], 208 cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]: 209 # The module name/path has very little to do with where the actual file 210 # exists (I learned this through experimentation and could not find it 211 # anywhere in the Python documentation). 212 # 213 # Bascially, we completely ignore the actual file location of the config 214 # we are loading and just tell Python that the module lives in the 215 # QEMU_CONFIGS_DIR for import purposes regardless of where it actually 216 # exists as a file. 217 module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path)) 218 spec = importlib.util.spec_from_file_location(module_path, config_path) 219 assert spec is not None 220 config = importlib.util.module_from_spec(spec) 221 # See https://github.com/python/typeshed/pull/2626 for context. 222 assert isinstance(spec.loader, importlib.abc.Loader) 223 spec.loader.exec_module(config) 224 225 if not hasattr(config, 'QEMU_ARCH'): 226 raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path) 227 params: qemu_config.QemuArchParams = config.QEMU_ARCH 228 if extra_qemu_args: 229 params.extra_qemu_params.extend(extra_qemu_args) 230 return params.linux_arch, LinuxSourceTreeOperationsQemu( 231 params, cross_compile=cross_compile) 232 233class LinuxSourceTree: 234 """Represents a Linux kernel source tree with KUnit tests.""" 235 236 def __init__( 237 self, 238 build_dir: str, 239 kunitconfig_paths: Optional[List[str]]=None, 240 kconfig_add: Optional[List[str]]=None, 241 arch: Optional[str]=None, 242 cross_compile: Optional[str]=None, 243 qemu_config_path: Optional[str]=None, 244 extra_qemu_args: Optional[List[str]]=None) -> None: 245 signal.signal(signal.SIGINT, self.signal_handler) 246 if qemu_config_path: 247 self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile) 248 else: 249 self._arch = 'um' if arch is None else arch 250 if self._arch == 'um': 251 self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile) 252 else: 253 qemu_config_path = _default_qemu_config_path(self._arch) 254 _, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile) 255 256 self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths) 257 if kconfig_add: 258 kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add)) 259 self._kconfig.merge_in_entries(kconfig) 260 261 def arch(self) -> str: 262 return self._arch 263 264 def clean(self) -> bool: 265 try: 266 self._ops.make_mrproper() 267 except ConfigError as e: 268 logging.error(e) 269 return False 270 return True 271 272 def validate_config(self, build_dir: str) -> bool: 273 kconfig_path = get_kconfig_path(build_dir) 274 validated_kconfig = kunit_config.parse_file(kconfig_path) 275 if self._kconfig.is_subset_of(validated_kconfig): 276 return True 277 missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries()) 278 message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \ 279 'This is probably due to unsatisfied dependencies.\n' \ 280 'Missing: ' + ', '.join(str(e) for e in missing) 281 if self._arch == 'um': 282 message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \ 283 'on a different architecture with something like "--arch=x86_64".' 284 logging.error(message) 285 return False 286 287 def build_config(self, build_dir: str, make_options: Optional[List[str]]) -> bool: 288 kconfig_path = get_kconfig_path(build_dir) 289 if build_dir and not os.path.exists(build_dir): 290 os.mkdir(build_dir) 291 try: 292 self._kconfig = self._ops.make_arch_config(self._kconfig) 293 self._kconfig.write_to_file(kconfig_path) 294 self._ops.make_olddefconfig(build_dir, make_options) 295 except ConfigError as e: 296 logging.error(e) 297 return False 298 if not self.validate_config(build_dir): 299 return False 300 301 old_path = get_old_kunitconfig_path(build_dir) 302 if os.path.exists(old_path): 303 os.remove(old_path) # write_to_file appends to the file 304 self._kconfig.write_to_file(old_path) 305 return True 306 307 def _kunitconfig_changed(self, build_dir: str) -> bool: 308 old_path = get_old_kunitconfig_path(build_dir) 309 if not os.path.exists(old_path): 310 return True 311 312 old_kconfig = kunit_config.parse_file(old_path) 313 return old_kconfig != self._kconfig 314 315 def build_reconfig(self, build_dir: str, make_options: Optional[List[str]]) -> bool: 316 """Creates a new .config if it is not a subset of the .kunitconfig.""" 317 kconfig_path = get_kconfig_path(build_dir) 318 if not os.path.exists(kconfig_path): 319 print('Generating .config ...') 320 return self.build_config(build_dir, make_options) 321 322 existing_kconfig = kunit_config.parse_file(kconfig_path) 323 self._kconfig = self._ops.make_arch_config(self._kconfig) 324 325 if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir): 326 return True 327 print('Regenerating .config ...') 328 os.remove(kconfig_path) 329 return self.build_config(build_dir, make_options) 330 331 def build_kernel(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> bool: 332 try: 333 self._ops.make_olddefconfig(build_dir, make_options) 334 self._ops.make(jobs, build_dir, make_options) 335 except (ConfigError, BuildError) as e: 336 logging.error(e) 337 return False 338 return self.validate_config(build_dir) 339 340 def run_kernel(self, args: Optional[List[str]]=None, build_dir: str='', filter_glob: str='', filter: str='', filter_action: Optional[str]=None, timeout: Optional[int]=None) -> Iterator[str]: 341 if not args: 342 args = [] 343 if filter_glob: 344 args.append('kunit.filter_glob=' + filter_glob) 345 if filter: 346 args.append('kunit.filter="' + filter + '"') 347 if filter_action: 348 args.append('kunit.filter_action=' + filter_action) 349 args.append('kunit.enable=1') 350 351 process = self._ops.start(args, build_dir) 352 assert process.stdout is not None # tell mypy it's set 353 354 # Enforce the timeout in a background thread. 355 def _wait_proc() -> None: 356 try: 357 process.wait(timeout=timeout) 358 except Exception as e: 359 print(e) 360 process.terminate() 361 process.wait() 362 waiter = threading.Thread(target=_wait_proc) 363 waiter.start() 364 365 output = open(get_outfile_path(build_dir), 'w') 366 try: 367 # Tee the output to the file and to our caller in real time. 368 for line in process.stdout: 369 output.write(line) 370 yield line 371 # This runs even if our caller doesn't consume every line. 372 finally: 373 # Flush any leftover output to the file 374 output.write(process.stdout.read()) 375 output.close() 376 process.stdout.close() 377 378 waiter.join() 379 subprocess.call(['stty', 'sane']) 380 381 def signal_handler(self, unused_sig: int, unused_frame: Optional[FrameType]) -> None: 382 logging.error('Build interruption occurred. Cleaning console.') 383 subprocess.call(['stty', 'sane']) 384