1import types 2from typing import Any 3from typing import Dict 4from typing import List 5from typing import NamedTuple 6from typing import Tuple 7 8import pytest 9import os 10 11 12class ATFCleanupItem(pytest.Item): 13 def runtest(self): 14 """Runs cleanup procedure for the test instead of the test""" 15 instance = self.parent.cls() 16 instance.cleanup(self.nodeid) 17 18 def setup_method_noop(self, method): 19 """Overrides runtest setup method""" 20 pass 21 22 def teardown_method_noop(self, method): 23 """Overrides runtest teardown method""" 24 pass 25 26 27class ATFTestObj(object): 28 def __init__(self, obj, has_cleanup): 29 # Use nodeid without name to properly name class-derived tests 30 self.ident = obj.nodeid.split("::", 1)[1] 31 self.description = self._get_test_description(obj) 32 self.has_cleanup = has_cleanup 33 self.obj = obj 34 35 def _get_test_description(self, obj): 36 """Returns first non-empty line from func docstring or func name""" 37 docstr = obj.function.__doc__ 38 if docstr: 39 for line in docstr.split("\n"): 40 if line: 41 return line 42 return obj.name 43 44 def _convert_marks(self, obj) -> Dict[str, Any]: 45 wj_func = lambda x: " ".join(x) # noqa: E731 46 _map: Dict[str, Dict] = { 47 "require_user": {"name": "require.user"}, 48 "require_arch": {"name": "require.arch", "fmt": wj_func}, 49 "require_diskspace": {"name": "require.diskspace"}, 50 "require_files": {"name": "require.files", "fmt": wj_func}, 51 "require_machine": {"name": "require.machine", "fmt": wj_func}, 52 "require_memory": {"name": "require.memory"}, 53 "require_progs": {"name": "require.progs", "fmt": wj_func}, 54 "timeout": {}, 55 } 56 ret = {} 57 for mark in obj.iter_markers(): 58 if mark.name in _map: 59 name = _map[mark.name].get("name", mark.name) 60 if "fmt" in _map[mark.name]: 61 val = _map[mark.name]["fmt"](mark.args[0]) 62 else: 63 val = mark.args[0] 64 ret[name] = val 65 return ret 66 67 def as_lines(self) -> List[str]: 68 """Output test definition in ATF-specific format""" 69 ret = [] 70 ret.append("ident: {}".format(self.ident)) 71 ret.append("descr: {}".format(self._get_test_description(self.obj))) 72 if self.has_cleanup: 73 ret.append("has.cleanup: true") 74 for key, value in self._convert_marks(self.obj).items(): 75 ret.append("{}: {}".format(key, value)) 76 return ret 77 78 79class ATFHandler(object): 80 class ReportState(NamedTuple): 81 state: str 82 reason: str 83 84 def __init__(self): 85 self._tests_state_map: Dict[str, ReportStatus] = {} 86 87 def override_runtest(self, obj): 88 # Override basic runtest command 89 obj.runtest = types.MethodType(ATFCleanupItem.runtest, obj) 90 # Override class setup/teardown 91 obj.parent.cls.setup_method = ATFCleanupItem.setup_method_noop 92 obj.parent.cls.teardown_method = ATFCleanupItem.teardown_method_noop 93 94 def get_object_cleanup_class(self, obj): 95 if hasattr(obj, "parent") and obj.parent is not None: 96 if hasattr(obj.parent, "cls") and obj.parent.cls is not None: 97 if hasattr(obj.parent.cls, "cleanup"): 98 return obj.parent.cls 99 return None 100 101 def has_object_cleanup(self, obj): 102 return self.get_object_cleanup_class(obj) is not None 103 104 def list_tests(self, tests: List[str]): 105 print('Content-Type: application/X-atf-tp; version="1"') 106 print() 107 for test_obj in tests: 108 has_cleanup = self.has_object_cleanup(test_obj) 109 atf_test = ATFTestObj(test_obj, has_cleanup) 110 for line in atf_test.as_lines(): 111 print(line) 112 print() 113 114 def set_report_state(self, test_name: str, state: str, reason: str): 115 self._tests_state_map[test_name] = self.ReportState(state, reason) 116 117 def _extract_report_reason(self, report): 118 data = report.longrepr 119 if data is None: 120 return None 121 if isinstance(data, Tuple): 122 # ('/path/to/test.py', 23, 'Skipped: unable to test') 123 reason = data[2] 124 for prefix in "Skipped: ": 125 if reason.startswith(prefix): 126 reason = reason[len(prefix):] 127 return reason 128 else: 129 # string/ traceback / exception report. Capture the last line 130 return str(data).split("\n")[-1] 131 return None 132 133 def add_report(self, report): 134 # MAP pytest report state to the atf-desired state 135 # 136 # ATF test states: 137 # (1) expected_death, (2) expected_exit, (3) expected_failure 138 # (4) expected_signal, (5) expected_timeout, (6) passed 139 # (7) skipped, (8) failed 140 # 141 # Note that ATF don't have the concept of "soft xfail" - xpass 142 # is a failure. It also calls teardown routine in a separate 143 # process, thus teardown states (pytest-only) are handled as 144 # body continuation. 145 146 # (stage, state, wasxfail) 147 148 # Just a passing test: WANT: passed 149 # GOT: (setup, passed, F), (call, passed, F), (teardown, passed, F) 150 # 151 # Failing body test: WHAT: failed 152 # GOT: (setup, passed, F), (call, failed, F), (teardown, passed, F) 153 # 154 # pytest.skip test decorator: WANT: skipped 155 # GOT: (setup,skipped, False), (teardown, passed, False) 156 # 157 # pytest.skip call inside test function: WANT: skipped 158 # GOT: (setup, passed, F), (call, skipped, F), (teardown,passed, F) 159 # 160 # mark.xfail decorator+pytest.xfail: WANT: expected_failure 161 # GOT: (setup, passed, F), (call, skipped, T), (teardown, passed, F) 162 # 163 # mark.xfail decorator+pass: WANT: failed 164 # GOT: (setup, passed, F), (call, passed, T), (teardown, passed, F) 165 166 test_name = report.location[2] 167 stage = report.when 168 state = report.outcome 169 reason = self._extract_report_reason(report) 170 171 # We don't care about strict xfail - it gets translated to False 172 173 if stage == "setup": 174 if state in ("skipped", "failed"): 175 # failed init -> failed test, skipped setup -> xskip 176 # for the whole test 177 self.set_report_state(test_name, state, reason) 178 elif stage == "call": 179 # "call" stage shouldn't matter if setup failed 180 if test_name in self._tests_state_map: 181 if self._tests_state_map[test_name].state == "failed": 182 return 183 if state == "failed": 184 # Record failure & override "skipped" state 185 self.set_report_state(test_name, state, reason) 186 elif state == "skipped": 187 if hasattr(reason, "wasxfail"): 188 # xfail() called in the test body 189 state = "expected_failure" 190 else: 191 # skip inside the body 192 pass 193 self.set_report_state(test_name, state, reason) 194 elif state == "passed": 195 if hasattr(reason, "wasxfail"): 196 # the test was expected to fail but didn't 197 # mark as hard failure 198 state = "failed" 199 self.set_report_state(test_name, state, reason) 200 elif stage == "teardown": 201 if state == "failed": 202 # teardown should be empty, as the cleanup 203 # procedures should be implemented as a separate 204 # function/method, so mark teardown failure as 205 # global failure 206 self.set_report_state(test_name, state, reason) 207 208 def write_report(self, path): 209 if self._tests_state_map: 210 # If we're executing in ATF mode, there has to be just one test 211 # Anyway, deterministically pick the first one 212 first_test_name = next(iter(self._tests_state_map)) 213 test = self._tests_state_map[first_test_name] 214 if test.state == "passed": 215 line = test.state 216 else: 217 line = "{}: {}".format(test.state, test.reason) 218 with open(path, mode="w") as f: 219 print(line, file=f) 220 221 @staticmethod 222 def get_atf_vars() -> Dict[str, str]: 223 px = "_ATF_VAR_" 224 return {k[len(px):]: v for k, v in os.environ.items() if k.startswith(px)} 225