xref: /freebsd/tests/atf_python/atf_pytest.py (revision 7c20397b724a55001c2054fa133a768e9d06eb1c)
1import types
2from typing import Any
3from typing import Dict
4from typing import List
5from typing import NamedTuple
6from typing import Tuple
7
8import pytest
9import os
10
11
12class ATFCleanupItem(pytest.Item):
13    def runtest(self):
14        """Runs cleanup procedure for the test instead of the test"""
15        instance = self.parent.cls()
16        instance.cleanup(self.nodeid)
17
18    def setup_method_noop(self, method):
19        """Overrides runtest setup method"""
20        pass
21
22    def teardown_method_noop(self, method):
23        """Overrides runtest teardown method"""
24        pass
25
26
27class ATFTestObj(object):
28    def __init__(self, obj, has_cleanup):
29        # Use nodeid without name to properly name class-derived tests
30        self.ident = obj.nodeid.split("::", 1)[1]
31        self.description = self._get_test_description(obj)
32        self.has_cleanup = has_cleanup
33        self.obj = obj
34
35    def _get_test_description(self, obj):
36        """Returns first non-empty line from func docstring or func name"""
37        docstr = obj.function.__doc__
38        if docstr:
39            for line in docstr.split("\n"):
40                if line:
41                    return line
42        return obj.name
43
44    def _convert_marks(self, obj) -> Dict[str, Any]:
45        wj_func = lambda x: " ".join(x)  # noqa: E731
46        _map: Dict[str, Dict] = {
47            "require_user": {"name": "require.user"},
48            "require_arch": {"name": "require.arch", "fmt": wj_func},
49            "require_diskspace": {"name": "require.diskspace"},
50            "require_files": {"name": "require.files", "fmt": wj_func},
51            "require_machine": {"name": "require.machine", "fmt": wj_func},
52            "require_memory": {"name": "require.memory"},
53            "require_progs": {"name": "require.progs", "fmt": wj_func},
54            "timeout": {},
55        }
56        ret = {}
57        for mark in obj.iter_markers():
58            if mark.name in _map:
59                name = _map[mark.name].get("name", mark.name)
60                if "fmt" in _map[mark.name]:
61                    val = _map[mark.name]["fmt"](mark.args[0])
62                else:
63                    val = mark.args[0]
64                ret[name] = val
65        return ret
66
67    def as_lines(self) -> List[str]:
68        """Output test definition in ATF-specific format"""
69        ret = []
70        ret.append("ident: {}".format(self.ident))
71        ret.append("descr: {}".format(self._get_test_description(self.obj)))
72        if self.has_cleanup:
73            ret.append("has.cleanup: true")
74        for key, value in self._convert_marks(self.obj).items():
75            ret.append("{}: {}".format(key, value))
76        return ret
77
78
79class ATFHandler(object):
80    class ReportState(NamedTuple):
81        state: str
82        reason: str
83
84    def __init__(self):
85        self._tests_state_map: Dict[str, ReportStatus] = {}
86
87    def override_runtest(self, obj):
88        # Override basic runtest command
89        obj.runtest = types.MethodType(ATFCleanupItem.runtest, obj)
90        # Override class setup/teardown
91        obj.parent.cls.setup_method = ATFCleanupItem.setup_method_noop
92        obj.parent.cls.teardown_method = ATFCleanupItem.teardown_method_noop
93
94    def get_object_cleanup_class(self, obj):
95        if hasattr(obj, "parent") and obj.parent is not None:
96            if hasattr(obj.parent, "cls") and obj.parent.cls is not None:
97                if hasattr(obj.parent.cls, "cleanup"):
98                    return obj.parent.cls
99        return None
100
101    def has_object_cleanup(self, obj):
102        return self.get_object_cleanup_class(obj) is not None
103
104    def list_tests(self, tests: List[str]):
105        print('Content-Type: application/X-atf-tp; version="1"')
106        print()
107        for test_obj in tests:
108            has_cleanup = self.has_object_cleanup(test_obj)
109            atf_test = ATFTestObj(test_obj, has_cleanup)
110            for line in atf_test.as_lines():
111                print(line)
112            print()
113
114    def set_report_state(self, test_name: str, state: str, reason: str):
115        self._tests_state_map[test_name] = self.ReportState(state, reason)
116
117    def _extract_report_reason(self, report):
118        data = report.longrepr
119        if data is None:
120            return None
121        if isinstance(data, Tuple):
122            # ('/path/to/test.py', 23, 'Skipped: unable to test')
123            reason = data[2]
124            for prefix in "Skipped: ":
125                if reason.startswith(prefix):
126                    reason = reason[len(prefix):]
127            return reason
128        else:
129            # string/ traceback / exception report. Capture the last line
130            return str(data).split("\n")[-1]
131        return None
132
133    def add_report(self, report):
134        # MAP pytest report state to the atf-desired state
135        #
136        # ATF test states:
137        # (1) expected_death, (2) expected_exit, (3) expected_failure
138        # (4) expected_signal, (5) expected_timeout, (6) passed
139        # (7) skipped, (8) failed
140        #
141        # Note that ATF don't have the concept of "soft xfail" - xpass
142        # is a failure. It also calls teardown routine in a separate
143        # process, thus teardown states (pytest-only) are handled as
144        # body continuation.
145
146        # (stage, state, wasxfail)
147
148        # Just a passing test: WANT: passed
149        # GOT: (setup, passed, F), (call, passed, F), (teardown, passed, F)
150        #
151        # Failing body test: WHAT: failed
152        # GOT: (setup, passed, F), (call, failed, F), (teardown, passed, F)
153        #
154        # pytest.skip test decorator: WANT: skipped
155        # GOT: (setup,skipped, False), (teardown, passed, False)
156        #
157        # pytest.skip call inside test function: WANT: skipped
158        # GOT: (setup, passed, F), (call, skipped, F), (teardown,passed, F)
159        #
160        # mark.xfail decorator+pytest.xfail: WANT: expected_failure
161        # GOT: (setup, passed, F), (call, skipped, T), (teardown, passed, F)
162        #
163        # mark.xfail decorator+pass: WANT: failed
164        # GOT: (setup, passed, F), (call, passed, T), (teardown, passed, F)
165
166        test_name = report.location[2]
167        stage = report.when
168        state = report.outcome
169        reason = self._extract_report_reason(report)
170
171        # We don't care about strict xfail - it gets translated to False
172
173        if stage == "setup":
174            if state in ("skipped", "failed"):
175                # failed init -> failed test, skipped setup -> xskip
176                # for the whole test
177                self.set_report_state(test_name, state, reason)
178        elif stage == "call":
179            # "call" stage shouldn't matter if setup failed
180            if test_name in self._tests_state_map:
181                if self._tests_state_map[test_name].state == "failed":
182                    return
183            if state == "failed":
184                # Record failure  & override "skipped" state
185                self.set_report_state(test_name, state, reason)
186            elif state == "skipped":
187                if hasattr(reason, "wasxfail"):
188                    # xfail() called in the test body
189                    state = "expected_failure"
190                else:
191                    # skip inside the body
192                    pass
193                self.set_report_state(test_name, state, reason)
194            elif state == "passed":
195                if hasattr(reason, "wasxfail"):
196                    # the test was expected to fail but didn't
197                    # mark as hard failure
198                    state = "failed"
199                self.set_report_state(test_name, state, reason)
200        elif stage == "teardown":
201            if state == "failed":
202                # teardown should be empty, as the cleanup
203                # procedures should be implemented as a separate
204                # function/method, so mark teardown failure as
205                # global failure
206                self.set_report_state(test_name, state, reason)
207
208    def write_report(self, path):
209        if self._tests_state_map:
210            # If we're executing in ATF mode, there has to be just one test
211            # Anyway, deterministically pick the first one
212            first_test_name = next(iter(self._tests_state_map))
213            test = self._tests_state_map[first_test_name]
214            if test.state == "passed":
215                line = test.state
216            else:
217                line = "{}: {}".format(test.state, test.reason)
218            with open(path, mode="w") as f:
219                print(line, file=f)
220
221    @staticmethod
222    def get_atf_vars() -> Dict[str, str]:
223        px = "_ATF_VAR_"
224        return {k[len(px):]: v for k, v in os.environ.items() if k.startswith(px)}
225