xref: /freebsd/tests/atf_python/atf_pytest.py (revision 3e5d0784b9b5296bda801add034b057ad68237f7)
18eb2bee6SAlexander V. Chernikovimport types
28eb2bee6SAlexander V. Chernikovfrom typing import Any
38eb2bee6SAlexander V. Chernikovfrom typing import Dict
48eb2bee6SAlexander V. Chernikovfrom typing import List
58eb2bee6SAlexander V. Chernikovfrom typing import NamedTuple
66332ef89SAlexander V. Chernikovfrom typing import Optional
78eb2bee6SAlexander V. Chernikovfrom typing import Tuple
88eb2bee6SAlexander V. Chernikov
9*3e5d0784SAlexander V. Chernikovfrom atf_python.ktest import generate_ktests
10d9af4219SAlexander V. Chernikovfrom atf_python.utils import nodeid_to_method_name
11d9af4219SAlexander V. Chernikov
128eb2bee6SAlexander V. Chernikovimport pytest
13513ce835SAlexander V. Chernikovimport os
148eb2bee6SAlexander V. Chernikov
158eb2bee6SAlexander V. Chernikov
168eb2bee6SAlexander V. Chernikovclass ATFCleanupItem(pytest.Item):
178eb2bee6SAlexander V. Chernikov    def runtest(self):
1889ffac3bSAlexander V. Chernikov        """Runs cleanup procedure for the test instead of the test itself"""
198eb2bee6SAlexander V. Chernikov        instance = self.parent.cls()
2089ffac3bSAlexander V. Chernikov        cleanup_name = "cleanup_{}".format(nodeid_to_method_name(self.nodeid))
2189ffac3bSAlexander V. Chernikov        if hasattr(instance, cleanup_name):
2289ffac3bSAlexander V. Chernikov            cleanup = getattr(instance, cleanup_name)
2389ffac3bSAlexander V. Chernikov            cleanup(self.nodeid)
2489ffac3bSAlexander V. Chernikov        elif hasattr(instance, "cleanup"):
258eb2bee6SAlexander V. Chernikov            instance.cleanup(self.nodeid)
268eb2bee6SAlexander V. Chernikov
278eb2bee6SAlexander V. Chernikov    def setup_method_noop(self, method):
288eb2bee6SAlexander V. Chernikov        """Overrides runtest setup method"""
298eb2bee6SAlexander V. Chernikov        pass
308eb2bee6SAlexander V. Chernikov
318eb2bee6SAlexander V. Chernikov    def teardown_method_noop(self, method):
328eb2bee6SAlexander V. Chernikov        """Overrides runtest teardown method"""
338eb2bee6SAlexander V. Chernikov        pass
348eb2bee6SAlexander V. Chernikov
358eb2bee6SAlexander V. Chernikov
368eb2bee6SAlexander V. Chernikovclass ATFTestObj(object):
378eb2bee6SAlexander V. Chernikov    def __init__(self, obj, has_cleanup):
388eb2bee6SAlexander V. Chernikov        # Use nodeid without name to properly name class-derived tests
398eb2bee6SAlexander V. Chernikov        self.ident = obj.nodeid.split("::", 1)[1]
408eb2bee6SAlexander V. Chernikov        self.description = self._get_test_description(obj)
418eb2bee6SAlexander V. Chernikov        self.has_cleanup = has_cleanup
428eb2bee6SAlexander V. Chernikov        self.obj = obj
438eb2bee6SAlexander V. Chernikov
448eb2bee6SAlexander V. Chernikov    def _get_test_description(self, obj):
458eb2bee6SAlexander V. Chernikov        """Returns first non-empty line from func docstring or func name"""
46*3e5d0784SAlexander V. Chernikov        if getattr(obj, "descr", None) is not None:
47*3e5d0784SAlexander V. Chernikov            return getattr(obj, "descr")
488eb2bee6SAlexander V. Chernikov        docstr = obj.function.__doc__
498eb2bee6SAlexander V. Chernikov        if docstr:
508eb2bee6SAlexander V. Chernikov            for line in docstr.split("\n"):
518eb2bee6SAlexander V. Chernikov                if line:
528eb2bee6SAlexander V. Chernikov                    return line
538eb2bee6SAlexander V. Chernikov        return obj.name
548eb2bee6SAlexander V. Chernikov
556332ef89SAlexander V. Chernikov    @staticmethod
566332ef89SAlexander V. Chernikov    def _convert_user_mark(mark, obj, ret: Dict):
576332ef89SAlexander V. Chernikov        username = mark.args[0]
586332ef89SAlexander V. Chernikov        if username == "unprivileged":
596332ef89SAlexander V. Chernikov            # Special unprivileged user requested.
606332ef89SAlexander V. Chernikov            # First, require the unprivileged-user config option presence
616332ef89SAlexander V. Chernikov            key = "require.config"
626332ef89SAlexander V. Chernikov            if key not in ret:
636332ef89SAlexander V. Chernikov                ret[key] = "unprivileged_user"
646332ef89SAlexander V. Chernikov            else:
656332ef89SAlexander V. Chernikov                ret[key] = "{} {}".format(ret[key], "unprivileged_user")
666332ef89SAlexander V. Chernikov        # Check if the framework requires root
676332ef89SAlexander V. Chernikov        test_cls = ATFHandler.get_test_class(obj)
686332ef89SAlexander V. Chernikov        if test_cls and getattr(test_cls, "NEED_ROOT", False):
696332ef89SAlexander V. Chernikov            # Yes, so we ask kyua to run us under root instead
706332ef89SAlexander V. Chernikov            # It is up to the implementation to switch back to the desired
716332ef89SAlexander V. Chernikov            # user
726332ef89SAlexander V. Chernikov            ret["require.user"] = "root"
736332ef89SAlexander V. Chernikov        else:
746332ef89SAlexander V. Chernikov            ret["require.user"] = username
756332ef89SAlexander V. Chernikov
768eb2bee6SAlexander V. Chernikov    def _convert_marks(self, obj) -> Dict[str, Any]:
778eb2bee6SAlexander V. Chernikov        wj_func = lambda x: " ".join(x)  # noqa: E731
788eb2bee6SAlexander V. Chernikov        _map: Dict[str, Dict] = {
796332ef89SAlexander V. Chernikov            "require_user": {"handler": self._convert_user_mark},
808eb2bee6SAlexander V. Chernikov            "require_arch": {"name": "require.arch", "fmt": wj_func},
818eb2bee6SAlexander V. Chernikov            "require_diskspace": {"name": "require.diskspace"},
828eb2bee6SAlexander V. Chernikov            "require_files": {"name": "require.files", "fmt": wj_func},
838eb2bee6SAlexander V. Chernikov            "require_machine": {"name": "require.machine", "fmt": wj_func},
848eb2bee6SAlexander V. Chernikov            "require_memory": {"name": "require.memory"},
858eb2bee6SAlexander V. Chernikov            "require_progs": {"name": "require.progs", "fmt": wj_func},
868eb2bee6SAlexander V. Chernikov            "timeout": {},
878eb2bee6SAlexander V. Chernikov        }
888eb2bee6SAlexander V. Chernikov        ret = {}
898eb2bee6SAlexander V. Chernikov        for mark in obj.iter_markers():
908eb2bee6SAlexander V. Chernikov            if mark.name in _map:
916332ef89SAlexander V. Chernikov                if "handler" in _map[mark.name]:
926332ef89SAlexander V. Chernikov                    _map[mark.name]["handler"](mark, obj, ret)
936332ef89SAlexander V. Chernikov                    continue
948eb2bee6SAlexander V. Chernikov                name = _map[mark.name].get("name", mark.name)
958eb2bee6SAlexander V. Chernikov                if "fmt" in _map[mark.name]:
968eb2bee6SAlexander V. Chernikov                    val = _map[mark.name]["fmt"](mark.args[0])
978eb2bee6SAlexander V. Chernikov                else:
988eb2bee6SAlexander V. Chernikov                    val = mark.args[0]
998eb2bee6SAlexander V. Chernikov                ret[name] = val
1008eb2bee6SAlexander V. Chernikov        return ret
1018eb2bee6SAlexander V. Chernikov
1028eb2bee6SAlexander V. Chernikov    def as_lines(self) -> List[str]:
1038eb2bee6SAlexander V. Chernikov        """Output test definition in ATF-specific format"""
1048eb2bee6SAlexander V. Chernikov        ret = []
1058eb2bee6SAlexander V. Chernikov        ret.append("ident: {}".format(self.ident))
1068eb2bee6SAlexander V. Chernikov        ret.append("descr: {}".format(self._get_test_description(self.obj)))
1078eb2bee6SAlexander V. Chernikov        if self.has_cleanup:
1088eb2bee6SAlexander V. Chernikov            ret.append("has.cleanup: true")
1098eb2bee6SAlexander V. Chernikov        for key, value in self._convert_marks(self.obj).items():
1108eb2bee6SAlexander V. Chernikov            ret.append("{}: {}".format(key, value))
1118eb2bee6SAlexander V. Chernikov        return ret
1128eb2bee6SAlexander V. Chernikov
1138eb2bee6SAlexander V. Chernikov
1148eb2bee6SAlexander V. Chernikovclass ATFHandler(object):
1158eb2bee6SAlexander V. Chernikov    class ReportState(NamedTuple):
1168eb2bee6SAlexander V. Chernikov        state: str
1178eb2bee6SAlexander V. Chernikov        reason: str
1188eb2bee6SAlexander V. Chernikov
1196332ef89SAlexander V. Chernikov    def __init__(self, report_file_name: Optional[str]):
1208eb2bee6SAlexander V. Chernikov        self._tests_state_map: Dict[str, ReportStatus] = {}
1216332ef89SAlexander V. Chernikov        self._report_file_name = report_file_name
1226332ef89SAlexander V. Chernikov        self._report_file_handle = None
1236332ef89SAlexander V. Chernikov
1246332ef89SAlexander V. Chernikov    def setup_configure(self):
1256332ef89SAlexander V. Chernikov        fname = self._report_file_name
1266332ef89SAlexander V. Chernikov        if fname:
1276332ef89SAlexander V. Chernikov            self._report_file_handle = open(fname, mode="w")
1286332ef89SAlexander V. Chernikov
1296332ef89SAlexander V. Chernikov    def setup_method_pre(self, item):
1306332ef89SAlexander V. Chernikov        """Called before actually running the test setup_method"""
1316332ef89SAlexander V. Chernikov        # Check if we need to manually drop the privileges
1326332ef89SAlexander V. Chernikov        for mark in item.iter_markers():
1336332ef89SAlexander V. Chernikov            if mark.name == "require_user":
1346332ef89SAlexander V. Chernikov                cls = self.get_test_class(item)
1356332ef89SAlexander V. Chernikov                cls.TARGET_USER = mark.args[0]
1366332ef89SAlexander V. Chernikov                break
1378eb2bee6SAlexander V. Chernikov
1388eb2bee6SAlexander V. Chernikov    def override_runtest(self, obj):
1398eb2bee6SAlexander V. Chernikov        # Override basic runtest command
1408eb2bee6SAlexander V. Chernikov        obj.runtest = types.MethodType(ATFCleanupItem.runtest, obj)
1418eb2bee6SAlexander V. Chernikov        # Override class setup/teardown
1428eb2bee6SAlexander V. Chernikov        obj.parent.cls.setup_method = ATFCleanupItem.setup_method_noop
1438eb2bee6SAlexander V. Chernikov        obj.parent.cls.teardown_method = ATFCleanupItem.teardown_method_noop
1448eb2bee6SAlexander V. Chernikov
14589ffac3bSAlexander V. Chernikov    @staticmethod
14689ffac3bSAlexander V. Chernikov    def get_test_class(obj):
1478eb2bee6SAlexander V. Chernikov        if hasattr(obj, "parent") and obj.parent is not None:
14889ffac3bSAlexander V. Chernikov            if hasattr(obj.parent, "cls"):
1498eb2bee6SAlexander V. Chernikov                return obj.parent.cls
1508eb2bee6SAlexander V. Chernikov
1518eb2bee6SAlexander V. Chernikov    def has_object_cleanup(self, obj):
15289ffac3bSAlexander V. Chernikov        cls = self.get_test_class(obj)
15389ffac3bSAlexander V. Chernikov        if cls is not None:
15489ffac3bSAlexander V. Chernikov            method_name = nodeid_to_method_name(obj.nodeid)
15589ffac3bSAlexander V. Chernikov            cleanup_name = "cleanup_{}".format(method_name)
15689ffac3bSAlexander V. Chernikov            if hasattr(cls, "cleanup") or hasattr(cls, cleanup_name):
15789ffac3bSAlexander V. Chernikov                return True
15889ffac3bSAlexander V. Chernikov        return False
1598eb2bee6SAlexander V. Chernikov
160d9af4219SAlexander V. Chernikov    def _generate_test_cleanups(self, items):
161d9af4219SAlexander V. Chernikov        new_items = []
162d9af4219SAlexander V. Chernikov        for obj in items:
163d9af4219SAlexander V. Chernikov            if self.has_object_cleanup(obj):
164d9af4219SAlexander V. Chernikov                self.override_runtest(obj)
165d9af4219SAlexander V. Chernikov                new_items.append(obj)
166d9af4219SAlexander V. Chernikov        items.clear()
167d9af4219SAlexander V. Chernikov        items.extend(new_items)
168d9af4219SAlexander V. Chernikov
169*3e5d0784SAlexander V. Chernikov    def expand_tests(self, collector, name, obj):
170*3e5d0784SAlexander V. Chernikov        return generate_ktests(collector, name, obj)
171*3e5d0784SAlexander V. Chernikov
172d9af4219SAlexander V. Chernikov    def modify_tests(self, items, config):
173d9af4219SAlexander V. Chernikov        if config.option.atf_cleanup:
174d9af4219SAlexander V. Chernikov            self._generate_test_cleanups(items)
175d9af4219SAlexander V. Chernikov
1768eb2bee6SAlexander V. Chernikov    def list_tests(self, tests: List[str]):
1778eb2bee6SAlexander V. Chernikov        print('Content-Type: application/X-atf-tp; version="1"')
1788eb2bee6SAlexander V. Chernikov        print()
1798eb2bee6SAlexander V. Chernikov        for test_obj in tests:
1808eb2bee6SAlexander V. Chernikov            has_cleanup = self.has_object_cleanup(test_obj)
1818eb2bee6SAlexander V. Chernikov            atf_test = ATFTestObj(test_obj, has_cleanup)
1828eb2bee6SAlexander V. Chernikov            for line in atf_test.as_lines():
1838eb2bee6SAlexander V. Chernikov                print(line)
1848eb2bee6SAlexander V. Chernikov            print()
1858eb2bee6SAlexander V. Chernikov
1868eb2bee6SAlexander V. Chernikov    def set_report_state(self, test_name: str, state: str, reason: str):
1878eb2bee6SAlexander V. Chernikov        self._tests_state_map[test_name] = self.ReportState(state, reason)
1888eb2bee6SAlexander V. Chernikov
1898eb2bee6SAlexander V. Chernikov    def _extract_report_reason(self, report):
1908eb2bee6SAlexander V. Chernikov        data = report.longrepr
1918eb2bee6SAlexander V. Chernikov        if data is None:
1928eb2bee6SAlexander V. Chernikov            return None
1938eb2bee6SAlexander V. Chernikov        if isinstance(data, Tuple):
1948eb2bee6SAlexander V. Chernikov            # ('/path/to/test.py', 23, 'Skipped: unable to test')
1958eb2bee6SAlexander V. Chernikov            reason = data[2]
1968eb2bee6SAlexander V. Chernikov            for prefix in "Skipped: ":
1978eb2bee6SAlexander V. Chernikov                if reason.startswith(prefix):
1988eb2bee6SAlexander V. Chernikov                    reason = reason[len(prefix):]
1998eb2bee6SAlexander V. Chernikov            return reason
2008eb2bee6SAlexander V. Chernikov        else:
2018eb2bee6SAlexander V. Chernikov            # string/ traceback / exception report. Capture the last line
2028eb2bee6SAlexander V. Chernikov            return str(data).split("\n")[-1]
2038eb2bee6SAlexander V. Chernikov        return None
2048eb2bee6SAlexander V. Chernikov
2058eb2bee6SAlexander V. Chernikov    def add_report(self, report):
2068eb2bee6SAlexander V. Chernikov        # MAP pytest report state to the atf-desired state
2078eb2bee6SAlexander V. Chernikov        #
2088eb2bee6SAlexander V. Chernikov        # ATF test states:
2098eb2bee6SAlexander V. Chernikov        # (1) expected_death, (2) expected_exit, (3) expected_failure
2108eb2bee6SAlexander V. Chernikov        # (4) expected_signal, (5) expected_timeout, (6) passed
2118eb2bee6SAlexander V. Chernikov        # (7) skipped, (8) failed
2128eb2bee6SAlexander V. Chernikov        #
2138eb2bee6SAlexander V. Chernikov        # Note that ATF don't have the concept of "soft xfail" - xpass
2148eb2bee6SAlexander V. Chernikov        # is a failure. It also calls teardown routine in a separate
2158eb2bee6SAlexander V. Chernikov        # process, thus teardown states (pytest-only) are handled as
2168eb2bee6SAlexander V. Chernikov        # body continuation.
2178eb2bee6SAlexander V. Chernikov
2188eb2bee6SAlexander V. Chernikov        # (stage, state, wasxfail)
2198eb2bee6SAlexander V. Chernikov
2208eb2bee6SAlexander V. Chernikov        # Just a passing test: WANT: passed
2218eb2bee6SAlexander V. Chernikov        # GOT: (setup, passed, F), (call, passed, F), (teardown, passed, F)
2228eb2bee6SAlexander V. Chernikov        #
2238eb2bee6SAlexander V. Chernikov        # Failing body test: WHAT: failed
2248eb2bee6SAlexander V. Chernikov        # GOT: (setup, passed, F), (call, failed, F), (teardown, passed, F)
2258eb2bee6SAlexander V. Chernikov        #
2268eb2bee6SAlexander V. Chernikov        # pytest.skip test decorator: WANT: skipped
2278eb2bee6SAlexander V. Chernikov        # GOT: (setup,skipped, False), (teardown, passed, False)
2288eb2bee6SAlexander V. Chernikov        #
2298eb2bee6SAlexander V. Chernikov        # pytest.skip call inside test function: WANT: skipped
2308eb2bee6SAlexander V. Chernikov        # GOT: (setup, passed, F), (call, skipped, F), (teardown,passed, F)
2318eb2bee6SAlexander V. Chernikov        #
2328eb2bee6SAlexander V. Chernikov        # mark.xfail decorator+pytest.xfail: WANT: expected_failure
2338eb2bee6SAlexander V. Chernikov        # GOT: (setup, passed, F), (call, skipped, T), (teardown, passed, F)
2348eb2bee6SAlexander V. Chernikov        #
2358eb2bee6SAlexander V. Chernikov        # mark.xfail decorator+pass: WANT: failed
2368eb2bee6SAlexander V. Chernikov        # GOT: (setup, passed, F), (call, passed, T), (teardown, passed, F)
2378eb2bee6SAlexander V. Chernikov
2388eb2bee6SAlexander V. Chernikov        test_name = report.location[2]
2398eb2bee6SAlexander V. Chernikov        stage = report.when
2408eb2bee6SAlexander V. Chernikov        state = report.outcome
2418eb2bee6SAlexander V. Chernikov        reason = self._extract_report_reason(report)
2428eb2bee6SAlexander V. Chernikov
2438eb2bee6SAlexander V. Chernikov        # We don't care about strict xfail - it gets translated to False
2448eb2bee6SAlexander V. Chernikov
2458eb2bee6SAlexander V. Chernikov        if stage == "setup":
2468eb2bee6SAlexander V. Chernikov            if state in ("skipped", "failed"):
2478eb2bee6SAlexander V. Chernikov                # failed init -> failed test, skipped setup -> xskip
2488eb2bee6SAlexander V. Chernikov                # for the whole test
2498eb2bee6SAlexander V. Chernikov                self.set_report_state(test_name, state, reason)
2508eb2bee6SAlexander V. Chernikov        elif stage == "call":
2518eb2bee6SAlexander V. Chernikov            # "call" stage shouldn't matter if setup failed
2528eb2bee6SAlexander V. Chernikov            if test_name in self._tests_state_map:
2538eb2bee6SAlexander V. Chernikov                if self._tests_state_map[test_name].state == "failed":
2548eb2bee6SAlexander V. Chernikov                    return
2558eb2bee6SAlexander V. Chernikov            if state == "failed":
2568eb2bee6SAlexander V. Chernikov                # Record failure  & override "skipped" state
2578eb2bee6SAlexander V. Chernikov                self.set_report_state(test_name, state, reason)
2588eb2bee6SAlexander V. Chernikov            elif state == "skipped":
2598eb2bee6SAlexander V. Chernikov                if hasattr(reason, "wasxfail"):
2608eb2bee6SAlexander V. Chernikov                    # xfail() called in the test body
2618eb2bee6SAlexander V. Chernikov                    state = "expected_failure"
2628eb2bee6SAlexander V. Chernikov                else:
2638eb2bee6SAlexander V. Chernikov                    # skip inside the body
2648eb2bee6SAlexander V. Chernikov                    pass
2658eb2bee6SAlexander V. Chernikov                self.set_report_state(test_name, state, reason)
2668eb2bee6SAlexander V. Chernikov            elif state == "passed":
2678eb2bee6SAlexander V. Chernikov                if hasattr(reason, "wasxfail"):
2688eb2bee6SAlexander V. Chernikov                    # the test was expected to fail but didn't
2698eb2bee6SAlexander V. Chernikov                    # mark as hard failure
2708eb2bee6SAlexander V. Chernikov                    state = "failed"
2718eb2bee6SAlexander V. Chernikov                self.set_report_state(test_name, state, reason)
2728eb2bee6SAlexander V. Chernikov        elif stage == "teardown":
2738eb2bee6SAlexander V. Chernikov            if state == "failed":
2748eb2bee6SAlexander V. Chernikov                # teardown should be empty, as the cleanup
2758eb2bee6SAlexander V. Chernikov                # procedures should be implemented as a separate
2768eb2bee6SAlexander V. Chernikov                # function/method, so mark teardown failure as
2778eb2bee6SAlexander V. Chernikov                # global failure
2788eb2bee6SAlexander V. Chernikov                self.set_report_state(test_name, state, reason)
2798eb2bee6SAlexander V. Chernikov
2806332ef89SAlexander V. Chernikov    def write_report(self):
2816332ef89SAlexander V. Chernikov        if self._report_file_handle is None:
2826332ef89SAlexander V. Chernikov            return
2838eb2bee6SAlexander V. Chernikov        if self._tests_state_map:
2848eb2bee6SAlexander V. Chernikov            # If we're executing in ATF mode, there has to be just one test
2858eb2bee6SAlexander V. Chernikov            # Anyway, deterministically pick the first one
2868eb2bee6SAlexander V. Chernikov            first_test_name = next(iter(self._tests_state_map))
2878eb2bee6SAlexander V. Chernikov            test = self._tests_state_map[first_test_name]
2888eb2bee6SAlexander V. Chernikov            if test.state == "passed":
2898eb2bee6SAlexander V. Chernikov                line = test.state
2908eb2bee6SAlexander V. Chernikov            else:
2918eb2bee6SAlexander V. Chernikov                line = "{}: {}".format(test.state, test.reason)
2926332ef89SAlexander V. Chernikov            print(line, file=self._report_file_handle)
2936332ef89SAlexander V. Chernikov        self._report_file_handle.close()
294513ce835SAlexander V. Chernikov
295513ce835SAlexander V. Chernikov    @staticmethod
296513ce835SAlexander V. Chernikov    def get_atf_vars() -> Dict[str, str]:
297513ce835SAlexander V. Chernikov        px = "_ATF_VAR_"
298513ce835SAlexander V. Chernikov        return {k[len(px):]: v for k, v in os.environ.items() if k.startswith(px)}
299