1#!/usr/bin/env python3 2import os 3import pwd 4from ctypes import CDLL 5from ctypes import get_errno 6from ctypes.util import find_library 7from typing import Dict 8from typing import List 9from typing import Optional 10 11import pytest 12 13 14def nodeid_to_method_name(nodeid: str) -> str: 15 """file_name.py::ClassName::method_name[parametrize] -> method_name""" 16 return nodeid.split("::")[-1].split("[")[0] 17 18 19class LibCWrapper(object): 20 def __init__(self): 21 path: Optional[str] = find_library("c") 22 if path is None: 23 raise RuntimeError("libc not found") 24 self._libc = CDLL(path, use_errno=True) 25 26 def modfind(self, mod_name: str) -> int: 27 if self._libc.modfind(bytes(mod_name, encoding="ascii")) == -1: 28 return get_errno() 29 return 0 30 31 def kldload(self, kld_name: str) -> int: 32 if self._libc.kldload(bytes(kld_name, encoding="ascii")) == -1: 33 return get_errno() 34 return 0 35 36 def jail_attach(self, jid: int) -> int: 37 if self._libc.jail_attach(jid) != 0: 38 return get_errno() 39 return 0 40 41 42libc = LibCWrapper() 43 44 45class BaseTest(object): 46 NEED_ROOT: bool = False # True if the class needs root privileges for the setup 47 TARGET_USER = None # Set to the target user by the framework 48 REQUIRED_MODULES: List[str] = [] 49 SKIP_MODULES: List[str] = [] 50 51 def require_module(self, mod_name: str, skip=True): 52 error_code = libc.modfind(mod_name) 53 if error_code == 0: 54 return 55 err_str = os.strerror(error_code) 56 txt = "kernel module '{}' not available: {}".format(mod_name, err_str) 57 if skip: 58 pytest.skip(txt) 59 else: 60 raise ValueError(txt) 61 62 def skip_module(self, mod_name: str): 63 error_code = libc.modfind(mod_name) 64 if error_code == 0: 65 txt = "kernel module '{}' loaded, skip test".format(mod_name) 66 pytest.skip(txt) 67 return 68 69 def _check_modules(self): 70 for mod_name in self.REQUIRED_MODULES: 71 self.require_module(mod_name) 72 for mod_name in self.SKIP_MODULES: 73 self.skip_module(mod_name) 74 75 @property 76 def atf_vars(self) -> Dict[str, str]: 77 px = "_ATF_VAR_" 78 return {k[len(px):]: v for k, v in os.environ.items() if k.startswith(px)} 79 80 def drop_privileges_user(self, user: str): 81 uid = pwd.getpwnam(user)[2] 82 print("Dropping privs to {}/{}".format(user, uid)) 83 os.setuid(uid) 84 85 def drop_privileges(self): 86 if self.TARGET_USER: 87 if self.TARGET_USER == "unprivileged": 88 user = self.atf_vars["unprivileged-user"] 89 else: 90 user = self.TARGET_USER 91 self.drop_privileges_user(user) 92 93 @property 94 def test_id(self) -> str: 95 # 'test_ip6_output.py::TestIP6Output::test_output6_pktinfo[ipandif] (setup)' 96 return os.environ.get("PYTEST_CURRENT_TEST").split(" ")[0] 97 98 def setup_method(self, method): 99 """Run all pre-requisits for the test execution""" 100 self._check_modules() 101