1import types 2from typing import Any 3from typing import Dict 4from typing import List 5from typing import NamedTuple 6from typing import Tuple 7 8import pytest 9 10 11class ATFCleanupItem(pytest.Item): 12 def runtest(self): 13 """Runs cleanup procedure for the test instead of the test""" 14 instance = self.parent.cls() 15 instance.cleanup(self.nodeid) 16 17 def setup_method_noop(self, method): 18 """Overrides runtest setup method""" 19 pass 20 21 def teardown_method_noop(self, method): 22 """Overrides runtest teardown method""" 23 pass 24 25 26class ATFTestObj(object): 27 def __init__(self, obj, has_cleanup): 28 # Use nodeid without name to properly name class-derived tests 29 self.ident = obj.nodeid.split("::", 1)[1] 30 self.description = self._get_test_description(obj) 31 self.has_cleanup = has_cleanup 32 self.obj = obj 33 34 def _get_test_description(self, obj): 35 """Returns first non-empty line from func docstring or func name""" 36 docstr = obj.function.__doc__ 37 if docstr: 38 for line in docstr.split("\n"): 39 if line: 40 return line 41 return obj.name 42 43 def _convert_marks(self, obj) -> Dict[str, Any]: 44 wj_func = lambda x: " ".join(x) # noqa: E731 45 _map: Dict[str, Dict] = { 46 "require_user": {"name": "require.user"}, 47 "require_arch": {"name": "require.arch", "fmt": wj_func}, 48 "require_diskspace": {"name": "require.diskspace"}, 49 "require_files": {"name": "require.files", "fmt": wj_func}, 50 "require_machine": {"name": "require.machine", "fmt": wj_func}, 51 "require_memory": {"name": "require.memory"}, 52 "require_progs": {"name": "require.progs", "fmt": wj_func}, 53 "timeout": {}, 54 } 55 ret = {} 56 for mark in obj.iter_markers(): 57 if mark.name in _map: 58 name = _map[mark.name].get("name", mark.name) 59 if "fmt" in _map[mark.name]: 60 val = _map[mark.name]["fmt"](mark.args[0]) 61 else: 62 val = mark.args[0] 63 ret[name] = val 64 return ret 65 66 def as_lines(self) -> List[str]: 67 """Output test definition in ATF-specific format""" 68 ret = [] 69 ret.append("ident: {}".format(self.ident)) 70 ret.append("descr: {}".format(self._get_test_description(self.obj))) 71 if self.has_cleanup: 72 ret.append("has.cleanup: true") 73 for key, value in self._convert_marks(self.obj).items(): 74 ret.append("{}: {}".format(key, value)) 75 return ret 76 77 78class ATFHandler(object): 79 class ReportState(NamedTuple): 80 state: str 81 reason: str 82 83 def __init__(self): 84 self._tests_state_map: Dict[str, ReportStatus] = {} 85 86 def override_runtest(self, obj): 87 # Override basic runtest command 88 obj.runtest = types.MethodType(ATFCleanupItem.runtest, obj) 89 # Override class setup/teardown 90 obj.parent.cls.setup_method = ATFCleanupItem.setup_method_noop 91 obj.parent.cls.teardown_method = ATFCleanupItem.teardown_method_noop 92 93 def get_object_cleanup_class(self, obj): 94 if hasattr(obj, "parent") and obj.parent is not None: 95 if hasattr(obj.parent, "cls") and obj.parent.cls is not None: 96 if hasattr(obj.parent.cls, "cleanup"): 97 return obj.parent.cls 98 return None 99 100 def has_object_cleanup(self, obj): 101 return self.get_object_cleanup_class(obj) is not None 102 103 def list_tests(self, tests: List[str]): 104 print('Content-Type: application/X-atf-tp; version="1"') 105 print() 106 for test_obj in tests: 107 has_cleanup = self.has_object_cleanup(test_obj) 108 atf_test = ATFTestObj(test_obj, has_cleanup) 109 for line in atf_test.as_lines(): 110 print(line) 111 print() 112 113 def set_report_state(self, test_name: str, state: str, reason: str): 114 self._tests_state_map[test_name] = self.ReportState(state, reason) 115 116 def _extract_report_reason(self, report): 117 data = report.longrepr 118 if data is None: 119 return None 120 if isinstance(data, Tuple): 121 # ('/path/to/test.py', 23, 'Skipped: unable to test') 122 reason = data[2] 123 for prefix in "Skipped: ": 124 if reason.startswith(prefix): 125 reason = reason[len(prefix):] 126 return reason 127 else: 128 # string/ traceback / exception report. Capture the last line 129 return str(data).split("\n")[-1] 130 return None 131 132 def add_report(self, report): 133 # MAP pytest report state to the atf-desired state 134 # 135 # ATF test states: 136 # (1) expected_death, (2) expected_exit, (3) expected_failure 137 # (4) expected_signal, (5) expected_timeout, (6) passed 138 # (7) skipped, (8) failed 139 # 140 # Note that ATF don't have the concept of "soft xfail" - xpass 141 # is a failure. It also calls teardown routine in a separate 142 # process, thus teardown states (pytest-only) are handled as 143 # body continuation. 144 145 # (stage, state, wasxfail) 146 147 # Just a passing test: WANT: passed 148 # GOT: (setup, passed, F), (call, passed, F), (teardown, passed, F) 149 # 150 # Failing body test: WHAT: failed 151 # GOT: (setup, passed, F), (call, failed, F), (teardown, passed, F) 152 # 153 # pytest.skip test decorator: WANT: skipped 154 # GOT: (setup,skipped, False), (teardown, passed, False) 155 # 156 # pytest.skip call inside test function: WANT: skipped 157 # GOT: (setup, passed, F), (call, skipped, F), (teardown,passed, F) 158 # 159 # mark.xfail decorator+pytest.xfail: WANT: expected_failure 160 # GOT: (setup, passed, F), (call, skipped, T), (teardown, passed, F) 161 # 162 # mark.xfail decorator+pass: WANT: failed 163 # GOT: (setup, passed, F), (call, passed, T), (teardown, passed, F) 164 165 test_name = report.location[2] 166 stage = report.when 167 state = report.outcome 168 reason = self._extract_report_reason(report) 169 170 # We don't care about strict xfail - it gets translated to False 171 172 if stage == "setup": 173 if state in ("skipped", "failed"): 174 # failed init -> failed test, skipped setup -> xskip 175 # for the whole test 176 self.set_report_state(test_name, state, reason) 177 elif stage == "call": 178 # "call" stage shouldn't matter if setup failed 179 if test_name in self._tests_state_map: 180 if self._tests_state_map[test_name].state == "failed": 181 return 182 if state == "failed": 183 # Record failure & override "skipped" state 184 self.set_report_state(test_name, state, reason) 185 elif state == "skipped": 186 if hasattr(reason, "wasxfail"): 187 # xfail() called in the test body 188 state = "expected_failure" 189 else: 190 # skip inside the body 191 pass 192 self.set_report_state(test_name, state, reason) 193 elif state == "passed": 194 if hasattr(reason, "wasxfail"): 195 # the test was expected to fail but didn't 196 # mark as hard failure 197 state = "failed" 198 self.set_report_state(test_name, state, reason) 199 elif stage == "teardown": 200 if state == "failed": 201 # teardown should be empty, as the cleanup 202 # procedures should be implemented as a separate 203 # function/method, so mark teardown failure as 204 # global failure 205 self.set_report_state(test_name, state, reason) 206 207 def write_report(self, path): 208 if self._tests_state_map: 209 # If we're executing in ATF mode, there has to be just one test 210 # Anyway, deterministically pick the first one 211 first_test_name = next(iter(self._tests_state_map)) 212 test = self._tests_state_map[first_test_name] 213 if test.state == "passed": 214 line = test.state 215 else: 216 line = "{}: {}".format(test.state, test.reason) 217 with open(path, mode="w") as f: 218 print(line, file=f) 219