xref: /freebsd/tests/atf_python/atf_pytest.py (revision 091febc04a5d5065fe633fe29c11f008dc392ab8)
1import types
2from typing import Any
3from typing import Dict
4from typing import List
5from typing import NamedTuple
6from typing import Tuple
7
8import pytest
9
10
11class ATFCleanupItem(pytest.Item):
12    def runtest(self):
13        """Runs cleanup procedure for the test instead of the test"""
14        instance = self.parent.cls()
15        instance.cleanup(self.nodeid)
16
17    def setup_method_noop(self, method):
18        """Overrides runtest setup method"""
19        pass
20
21    def teardown_method_noop(self, method):
22        """Overrides runtest teardown method"""
23        pass
24
25
26class ATFTestObj(object):
27    def __init__(self, obj, has_cleanup):
28        # Use nodeid without name to properly name class-derived tests
29        self.ident = obj.nodeid.split("::", 1)[1]
30        self.description = self._get_test_description(obj)
31        self.has_cleanup = has_cleanup
32        self.obj = obj
33
34    def _get_test_description(self, obj):
35        """Returns first non-empty line from func docstring or func name"""
36        docstr = obj.function.__doc__
37        if docstr:
38            for line in docstr.split("\n"):
39                if line:
40                    return line
41        return obj.name
42
43    def _convert_marks(self, obj) -> Dict[str, Any]:
44        wj_func = lambda x: " ".join(x)  # noqa: E731
45        _map: Dict[str, Dict] = {
46            "require_user": {"name": "require.user"},
47            "require_arch": {"name": "require.arch", "fmt": wj_func},
48            "require_diskspace": {"name": "require.diskspace"},
49            "require_files": {"name": "require.files", "fmt": wj_func},
50            "require_machine": {"name": "require.machine", "fmt": wj_func},
51            "require_memory": {"name": "require.memory"},
52            "require_progs": {"name": "require.progs", "fmt": wj_func},
53            "timeout": {},
54        }
55        ret = {}
56        for mark in obj.iter_markers():
57            if mark.name in _map:
58                name = _map[mark.name].get("name", mark.name)
59                if "fmt" in _map[mark.name]:
60                    val = _map[mark.name]["fmt"](mark.args[0])
61                else:
62                    val = mark.args[0]
63                ret[name] = val
64        return ret
65
66    def as_lines(self) -> List[str]:
67        """Output test definition in ATF-specific format"""
68        ret = []
69        ret.append("ident: {}".format(self.ident))
70        ret.append("descr: {}".format(self._get_test_description(self.obj)))
71        if self.has_cleanup:
72            ret.append("has.cleanup: true")
73        for key, value in self._convert_marks(self.obj).items():
74            ret.append("{}: {}".format(key, value))
75        return ret
76
77
78class ATFHandler(object):
79    class ReportState(NamedTuple):
80        state: str
81        reason: str
82
83    def __init__(self):
84        self._tests_state_map: Dict[str, ReportStatus] = {}
85
86    def override_runtest(self, obj):
87        # Override basic runtest command
88        obj.runtest = types.MethodType(ATFCleanupItem.runtest, obj)
89        # Override class setup/teardown
90        obj.parent.cls.setup_method = ATFCleanupItem.setup_method_noop
91        obj.parent.cls.teardown_method = ATFCleanupItem.teardown_method_noop
92
93    def get_object_cleanup_class(self, obj):
94        if hasattr(obj, "parent") and obj.parent is not None:
95            if hasattr(obj.parent, "cls") and obj.parent.cls is not None:
96                if hasattr(obj.parent.cls, "cleanup"):
97                    return obj.parent.cls
98        return None
99
100    def has_object_cleanup(self, obj):
101        return self.get_object_cleanup_class(obj) is not None
102
103    def list_tests(self, tests: List[str]):
104        print('Content-Type: application/X-atf-tp; version="1"')
105        print()
106        for test_obj in tests:
107            has_cleanup = self.has_object_cleanup(test_obj)
108            atf_test = ATFTestObj(test_obj, has_cleanup)
109            for line in atf_test.as_lines():
110                print(line)
111            print()
112
113    def set_report_state(self, test_name: str, state: str, reason: str):
114        self._tests_state_map[test_name] = self.ReportState(state, reason)
115
116    def _extract_report_reason(self, report):
117        data = report.longrepr
118        if data is None:
119            return None
120        if isinstance(data, Tuple):
121            # ('/path/to/test.py', 23, 'Skipped: unable to test')
122            reason = data[2]
123            for prefix in "Skipped: ":
124                if reason.startswith(prefix):
125                    reason = reason[len(prefix):]
126            return reason
127        else:
128            # string/ traceback / exception report. Capture the last line
129            return str(data).split("\n")[-1]
130        return None
131
132    def add_report(self, report):
133        # MAP pytest report state to the atf-desired state
134        #
135        # ATF test states:
136        # (1) expected_death, (2) expected_exit, (3) expected_failure
137        # (4) expected_signal, (5) expected_timeout, (6) passed
138        # (7) skipped, (8) failed
139        #
140        # Note that ATF don't have the concept of "soft xfail" - xpass
141        # is a failure. It also calls teardown routine in a separate
142        # process, thus teardown states (pytest-only) are handled as
143        # body continuation.
144
145        # (stage, state, wasxfail)
146
147        # Just a passing test: WANT: passed
148        # GOT: (setup, passed, F), (call, passed, F), (teardown, passed, F)
149        #
150        # Failing body test: WHAT: failed
151        # GOT: (setup, passed, F), (call, failed, F), (teardown, passed, F)
152        #
153        # pytest.skip test decorator: WANT: skipped
154        # GOT: (setup,skipped, False), (teardown, passed, False)
155        #
156        # pytest.skip call inside test function: WANT: skipped
157        # GOT: (setup, passed, F), (call, skipped, F), (teardown,passed, F)
158        #
159        # mark.xfail decorator+pytest.xfail: WANT: expected_failure
160        # GOT: (setup, passed, F), (call, skipped, T), (teardown, passed, F)
161        #
162        # mark.xfail decorator+pass: WANT: failed
163        # GOT: (setup, passed, F), (call, passed, T), (teardown, passed, F)
164
165        test_name = report.location[2]
166        stage = report.when
167        state = report.outcome
168        reason = self._extract_report_reason(report)
169
170        # We don't care about strict xfail - it gets translated to False
171
172        if stage == "setup":
173            if state in ("skipped", "failed"):
174                # failed init -> failed test, skipped setup -> xskip
175                # for the whole test
176                self.set_report_state(test_name, state, reason)
177        elif stage == "call":
178            # "call" stage shouldn't matter if setup failed
179            if test_name in self._tests_state_map:
180                if self._tests_state_map[test_name].state == "failed":
181                    return
182            if state == "failed":
183                # Record failure  & override "skipped" state
184                self.set_report_state(test_name, state, reason)
185            elif state == "skipped":
186                if hasattr(reason, "wasxfail"):
187                    # xfail() called in the test body
188                    state = "expected_failure"
189                else:
190                    # skip inside the body
191                    pass
192                self.set_report_state(test_name, state, reason)
193            elif state == "passed":
194                if hasattr(reason, "wasxfail"):
195                    # the test was expected to fail but didn't
196                    # mark as hard failure
197                    state = "failed"
198                self.set_report_state(test_name, state, reason)
199        elif stage == "teardown":
200            if state == "failed":
201                # teardown should be empty, as the cleanup
202                # procedures should be implemented as a separate
203                # function/method, so mark teardown failure as
204                # global failure
205                self.set_report_state(test_name, state, reason)
206
207    def write_report(self, path):
208        if self._tests_state_map:
209            # If we're executing in ATF mode, there has to be just one test
210            # Anyway, deterministically pick the first one
211            first_test_name = next(iter(self._tests_state_map))
212            test = self._tests_state_map[first_test_name]
213            if test.state == "passed":
214                line = test.state
215            else:
216                line = "{}: {}".format(test.state, test.reason)
217            with open(path, mode="w") as f:
218                print(line, file=f)
219