1import types 2from typing import Any 3from typing import Dict 4from typing import List 5from typing import NamedTuple 6from typing import Tuple 7 8import pytest 9import os 10 11 12def nodeid_to_method_name(nodeid: str) -> str: 13 """file_name.py::ClassName::method_name[parametrize] -> method_name""" 14 return nodeid.split("::")[-1].split("[")[0] 15 16 17class ATFCleanupItem(pytest.Item): 18 def runtest(self): 19 """Runs cleanup procedure for the test instead of the test itself""" 20 instance = self.parent.cls() 21 cleanup_name = "cleanup_{}".format(nodeid_to_method_name(self.nodeid)) 22 if hasattr(instance, cleanup_name): 23 cleanup = getattr(instance, cleanup_name) 24 cleanup(self.nodeid) 25 elif hasattr(instance, "cleanup"): 26 instance.cleanup(self.nodeid) 27 28 def setup_method_noop(self, method): 29 """Overrides runtest setup method""" 30 pass 31 32 def teardown_method_noop(self, method): 33 """Overrides runtest teardown method""" 34 pass 35 36 37class ATFTestObj(object): 38 def __init__(self, obj, has_cleanup): 39 # Use nodeid without name to properly name class-derived tests 40 self.ident = obj.nodeid.split("::", 1)[1] 41 self.description = self._get_test_description(obj) 42 self.has_cleanup = has_cleanup 43 self.obj = obj 44 45 def _get_test_description(self, obj): 46 """Returns first non-empty line from func docstring or func name""" 47 docstr = obj.function.__doc__ 48 if docstr: 49 for line in docstr.split("\n"): 50 if line: 51 return line 52 return obj.name 53 54 def _convert_marks(self, obj) -> Dict[str, Any]: 55 wj_func = lambda x: " ".join(x) # noqa: E731 56 _map: Dict[str, Dict] = { 57 "require_user": {"name": "require.user"}, 58 "require_arch": {"name": "require.arch", "fmt": wj_func}, 59 "require_diskspace": {"name": "require.diskspace"}, 60 "require_files": {"name": "require.files", "fmt": wj_func}, 61 "require_machine": {"name": "require.machine", "fmt": wj_func}, 62 "require_memory": {"name": "require.memory"}, 63 "require_progs": {"name": "require.progs", "fmt": wj_func}, 64 "timeout": {}, 65 } 66 ret = {} 67 for mark in obj.iter_markers(): 68 if mark.name in _map: 69 name = _map[mark.name].get("name", mark.name) 70 if "fmt" in _map[mark.name]: 71 val = _map[mark.name]["fmt"](mark.args[0]) 72 else: 73 val = mark.args[0] 74 ret[name] = val 75 return ret 76 77 def as_lines(self) -> List[str]: 78 """Output test definition in ATF-specific format""" 79 ret = [] 80 ret.append("ident: {}".format(self.ident)) 81 ret.append("descr: {}".format(self._get_test_description(self.obj))) 82 if self.has_cleanup: 83 ret.append("has.cleanup: true") 84 for key, value in self._convert_marks(self.obj).items(): 85 ret.append("{}: {}".format(key, value)) 86 return ret 87 88 89class ATFHandler(object): 90 class ReportState(NamedTuple): 91 state: str 92 reason: str 93 94 def __init__(self): 95 self._tests_state_map: Dict[str, ReportStatus] = {} 96 97 def override_runtest(self, obj): 98 # Override basic runtest command 99 obj.runtest = types.MethodType(ATFCleanupItem.runtest, obj) 100 # Override class setup/teardown 101 obj.parent.cls.setup_method = ATFCleanupItem.setup_method_noop 102 obj.parent.cls.teardown_method = ATFCleanupItem.teardown_method_noop 103 104 @staticmethod 105 def get_test_class(obj): 106 if hasattr(obj, "parent") and obj.parent is not None: 107 if hasattr(obj.parent, "cls"): 108 return obj.parent.cls 109 110 def has_object_cleanup(self, obj): 111 cls = self.get_test_class(obj) 112 if cls is not None: 113 method_name = nodeid_to_method_name(obj.nodeid) 114 cleanup_name = "cleanup_{}".format(method_name) 115 if hasattr(cls, "cleanup") or hasattr(cls, cleanup_name): 116 return True 117 return False 118 119 def list_tests(self, tests: List[str]): 120 print('Content-Type: application/X-atf-tp; version="1"') 121 print() 122 for test_obj in tests: 123 has_cleanup = self.has_object_cleanup(test_obj) 124 atf_test = ATFTestObj(test_obj, has_cleanup) 125 for line in atf_test.as_lines(): 126 print(line) 127 print() 128 129 def set_report_state(self, test_name: str, state: str, reason: str): 130 self._tests_state_map[test_name] = self.ReportState(state, reason) 131 132 def _extract_report_reason(self, report): 133 data = report.longrepr 134 if data is None: 135 return None 136 if isinstance(data, Tuple): 137 # ('/path/to/test.py', 23, 'Skipped: unable to test') 138 reason = data[2] 139 for prefix in "Skipped: ": 140 if reason.startswith(prefix): 141 reason = reason[len(prefix):] 142 return reason 143 else: 144 # string/ traceback / exception report. Capture the last line 145 return str(data).split("\n")[-1] 146 return None 147 148 def add_report(self, report): 149 # MAP pytest report state to the atf-desired state 150 # 151 # ATF test states: 152 # (1) expected_death, (2) expected_exit, (3) expected_failure 153 # (4) expected_signal, (5) expected_timeout, (6) passed 154 # (7) skipped, (8) failed 155 # 156 # Note that ATF don't have the concept of "soft xfail" - xpass 157 # is a failure. It also calls teardown routine in a separate 158 # process, thus teardown states (pytest-only) are handled as 159 # body continuation. 160 161 # (stage, state, wasxfail) 162 163 # Just a passing test: WANT: passed 164 # GOT: (setup, passed, F), (call, passed, F), (teardown, passed, F) 165 # 166 # Failing body test: WHAT: failed 167 # GOT: (setup, passed, F), (call, failed, F), (teardown, passed, F) 168 # 169 # pytest.skip test decorator: WANT: skipped 170 # GOT: (setup,skipped, False), (teardown, passed, False) 171 # 172 # pytest.skip call inside test function: WANT: skipped 173 # GOT: (setup, passed, F), (call, skipped, F), (teardown,passed, F) 174 # 175 # mark.xfail decorator+pytest.xfail: WANT: expected_failure 176 # GOT: (setup, passed, F), (call, skipped, T), (teardown, passed, F) 177 # 178 # mark.xfail decorator+pass: WANT: failed 179 # GOT: (setup, passed, F), (call, passed, T), (teardown, passed, F) 180 181 test_name = report.location[2] 182 stage = report.when 183 state = report.outcome 184 reason = self._extract_report_reason(report) 185 186 # We don't care about strict xfail - it gets translated to False 187 188 if stage == "setup": 189 if state in ("skipped", "failed"): 190 # failed init -> failed test, skipped setup -> xskip 191 # for the whole test 192 self.set_report_state(test_name, state, reason) 193 elif stage == "call": 194 # "call" stage shouldn't matter if setup failed 195 if test_name in self._tests_state_map: 196 if self._tests_state_map[test_name].state == "failed": 197 return 198 if state == "failed": 199 # Record failure & override "skipped" state 200 self.set_report_state(test_name, state, reason) 201 elif state == "skipped": 202 if hasattr(reason, "wasxfail"): 203 # xfail() called in the test body 204 state = "expected_failure" 205 else: 206 # skip inside the body 207 pass 208 self.set_report_state(test_name, state, reason) 209 elif state == "passed": 210 if hasattr(reason, "wasxfail"): 211 # the test was expected to fail but didn't 212 # mark as hard failure 213 state = "failed" 214 self.set_report_state(test_name, state, reason) 215 elif stage == "teardown": 216 if state == "failed": 217 # teardown should be empty, as the cleanup 218 # procedures should be implemented as a separate 219 # function/method, so mark teardown failure as 220 # global failure 221 self.set_report_state(test_name, state, reason) 222 223 def write_report(self, path): 224 if self._tests_state_map: 225 # If we're executing in ATF mode, there has to be just one test 226 # Anyway, deterministically pick the first one 227 first_test_name = next(iter(self._tests_state_map)) 228 test = self._tests_state_map[first_test_name] 229 if test.state == "passed": 230 line = test.state 231 else: 232 line = "{}: {}".format(test.state, test.reason) 233 with open(path, mode="w") as f: 234 print(line, file=f) 235 236 @staticmethod 237 def get_atf_vars() -> Dict[str, str]: 238 px = "_ATF_VAR_" 239 return {k[len(px):]: v for k, v in os.environ.items() if k.startswith(px)} 240