xref: /freebsd/tests/atf_python/utils.py (revision 7659d0fa2b873374623e7582e38fb2fe92056c87)
1#!/usr/bin/env python3
2import os
3import pwd
4from ctypes import CDLL
5from ctypes import get_errno
6from ctypes.util import find_library
7from typing import Dict
8from typing import List
9from typing import Optional
10
11import pytest
12
13
14def nodeid_to_method_name(nodeid: str) -> str:
15    """file_name.py::ClassName::method_name[parametrize] -> method_name"""
16    return nodeid.split("::")[-1].split("[")[0]
17
18
19class LibCWrapper(object):
20    def __init__(self):
21        path: Optional[str] = find_library("c")
22        if path is None:
23            raise RuntimeError("libc not found")
24        self._libc = CDLL(path, use_errno=True)
25
26    def modfind(self, mod_name: str) -> int:
27        if self._libc.modfind(bytes(mod_name, encoding="ascii")) == -1:
28            return get_errno()
29        return 0
30
31    def kldload(self, kld_name: str) -> int:
32        if self._libc.kldload(bytes(kld_name, encoding="ascii")) == -1:
33            return get_errno()
34        return 0
35
36    def jail_attach(self, jid: int) -> int:
37        if self._libc.jail_attach(jid) != 0:
38            return get_errno()
39        return 0
40
41
42libc = LibCWrapper()
43
44
45class BaseTest(object):
46    NEED_ROOT: bool = False  # True if the class needs root privileges for the setup
47    TARGET_USER = None  # Set to the target user by the framework
48    REQUIRED_MODULES: List[str] = []
49    SKIP_MODULES: List[str] = []
50
51    def require_module(self, mod_name: str, skip=True):
52        error_code = libc.modfind(mod_name)
53        if error_code == 0:
54            return
55        err_str = os.strerror(error_code)
56        txt = "kernel module '{}' not available: {}".format(mod_name, err_str)
57        if skip:
58            pytest.skip(txt)
59        else:
60            raise ValueError(txt)
61
62    def skip_module(self, mod_name: str):
63        error_code = libc.modfind(mod_name)
64        if error_code == 0:
65            txt = "kernel module '{}' loaded, skip test".format(mod_name)
66            pytest.skip(txt)
67            return
68
69    def _check_modules(self):
70        for mod_name in self.REQUIRED_MODULES:
71            self.require_module(mod_name)
72        for mod_name in self.SKIP_MODULES:
73            self.skip_module(mod_name)
74
75    @property
76    def atf_vars(self) -> Dict[str, str]:
77        px = "_ATF_VAR_"
78        return {k[len(px):]: v for k, v in os.environ.items() if k.startswith(px)}
79
80    def drop_privileges_user(self, user: str):
81        uid = pwd.getpwnam(user)[2]
82        print("Dropping privs to {}/{}".format(user, uid))
83        os.setuid(uid)
84
85    def drop_privileges(self):
86        if self.TARGET_USER:
87            if self.TARGET_USER == "unprivileged":
88                user = self.atf_vars["unprivileged-user"]
89            else:
90                user = self.TARGET_USER
91            self.drop_privileges_user(user)
92
93    @property
94    def test_id(self) -> str:
95        # 'test_ip6_output.py::TestIP6Output::test_output6_pktinfo[ipandif] (setup)'
96        return os.environ.get("PYTEST_CURRENT_TEST").split(" ")[0]
97
98    def setup_method(self, method):
99        """Run all pre-requisits for the test execution"""
100        self._check_modules()
101