18eb2bee6SAlexander V. Chernikovimport types 28eb2bee6SAlexander V. Chernikovfrom typing import Any 38eb2bee6SAlexander V. Chernikovfrom typing import Dict 48eb2bee6SAlexander V. Chernikovfrom typing import List 58eb2bee6SAlexander V. Chernikovfrom typing import NamedTuple 66332ef89SAlexander V. Chernikovfrom typing import Optional 78eb2bee6SAlexander V. Chernikovfrom typing import Tuple 88eb2bee6SAlexander V. Chernikov 9*3e5d0784SAlexander V. Chernikovfrom atf_python.ktest import generate_ktests 10d9af4219SAlexander V. Chernikovfrom atf_python.utils import nodeid_to_method_name 11d9af4219SAlexander V. Chernikov 128eb2bee6SAlexander V. Chernikovimport pytest 13513ce835SAlexander V. Chernikovimport os 148eb2bee6SAlexander V. Chernikov 158eb2bee6SAlexander V. Chernikov 168eb2bee6SAlexander V. Chernikovclass ATFCleanupItem(pytest.Item): 178eb2bee6SAlexander V. Chernikov def runtest(self): 1889ffac3bSAlexander V. Chernikov """Runs cleanup procedure for the test instead of the test itself""" 198eb2bee6SAlexander V. Chernikov instance = self.parent.cls() 2089ffac3bSAlexander V. Chernikov cleanup_name = "cleanup_{}".format(nodeid_to_method_name(self.nodeid)) 2189ffac3bSAlexander V. Chernikov if hasattr(instance, cleanup_name): 2289ffac3bSAlexander V. Chernikov cleanup = getattr(instance, cleanup_name) 2389ffac3bSAlexander V. Chernikov cleanup(self.nodeid) 2489ffac3bSAlexander V. Chernikov elif hasattr(instance, "cleanup"): 258eb2bee6SAlexander V. Chernikov instance.cleanup(self.nodeid) 268eb2bee6SAlexander V. Chernikov 278eb2bee6SAlexander V. Chernikov def setup_method_noop(self, method): 288eb2bee6SAlexander V. Chernikov """Overrides runtest setup method""" 298eb2bee6SAlexander V. Chernikov pass 308eb2bee6SAlexander V. Chernikov 318eb2bee6SAlexander V. Chernikov def teardown_method_noop(self, method): 328eb2bee6SAlexander V. Chernikov """Overrides runtest teardown method""" 338eb2bee6SAlexander V. Chernikov pass 348eb2bee6SAlexander V. Chernikov 358eb2bee6SAlexander V. Chernikov 368eb2bee6SAlexander V. Chernikovclass ATFTestObj(object): 378eb2bee6SAlexander V. Chernikov def __init__(self, obj, has_cleanup): 388eb2bee6SAlexander V. Chernikov # Use nodeid without name to properly name class-derived tests 398eb2bee6SAlexander V. Chernikov self.ident = obj.nodeid.split("::", 1)[1] 408eb2bee6SAlexander V. Chernikov self.description = self._get_test_description(obj) 418eb2bee6SAlexander V. Chernikov self.has_cleanup = has_cleanup 428eb2bee6SAlexander V. Chernikov self.obj = obj 438eb2bee6SAlexander V. Chernikov 448eb2bee6SAlexander V. Chernikov def _get_test_description(self, obj): 458eb2bee6SAlexander V. Chernikov """Returns first non-empty line from func docstring or func name""" 46*3e5d0784SAlexander V. Chernikov if getattr(obj, "descr", None) is not None: 47*3e5d0784SAlexander V. Chernikov return getattr(obj, "descr") 488eb2bee6SAlexander V. Chernikov docstr = obj.function.__doc__ 498eb2bee6SAlexander V. Chernikov if docstr: 508eb2bee6SAlexander V. Chernikov for line in docstr.split("\n"): 518eb2bee6SAlexander V. Chernikov if line: 528eb2bee6SAlexander V. Chernikov return line 538eb2bee6SAlexander V. Chernikov return obj.name 548eb2bee6SAlexander V. Chernikov 556332ef89SAlexander V. Chernikov @staticmethod 566332ef89SAlexander V. Chernikov def _convert_user_mark(mark, obj, ret: Dict): 576332ef89SAlexander V. Chernikov username = mark.args[0] 586332ef89SAlexander V. Chernikov if username == "unprivileged": 596332ef89SAlexander V. Chernikov # Special unprivileged user requested. 606332ef89SAlexander V. Chernikov # First, require the unprivileged-user config option presence 616332ef89SAlexander V. Chernikov key = "require.config" 626332ef89SAlexander V. Chernikov if key not in ret: 636332ef89SAlexander V. Chernikov ret[key] = "unprivileged_user" 646332ef89SAlexander V. Chernikov else: 656332ef89SAlexander V. Chernikov ret[key] = "{} {}".format(ret[key], "unprivileged_user") 666332ef89SAlexander V. Chernikov # Check if the framework requires root 676332ef89SAlexander V. Chernikov test_cls = ATFHandler.get_test_class(obj) 686332ef89SAlexander V. Chernikov if test_cls and getattr(test_cls, "NEED_ROOT", False): 696332ef89SAlexander V. Chernikov # Yes, so we ask kyua to run us under root instead 706332ef89SAlexander V. Chernikov # It is up to the implementation to switch back to the desired 716332ef89SAlexander V. Chernikov # user 726332ef89SAlexander V. Chernikov ret["require.user"] = "root" 736332ef89SAlexander V. Chernikov else: 746332ef89SAlexander V. Chernikov ret["require.user"] = username 756332ef89SAlexander V. Chernikov 768eb2bee6SAlexander V. Chernikov def _convert_marks(self, obj) -> Dict[str, Any]: 778eb2bee6SAlexander V. Chernikov wj_func = lambda x: " ".join(x) # noqa: E731 788eb2bee6SAlexander V. Chernikov _map: Dict[str, Dict] = { 796332ef89SAlexander V. Chernikov "require_user": {"handler": self._convert_user_mark}, 808eb2bee6SAlexander V. Chernikov "require_arch": {"name": "require.arch", "fmt": wj_func}, 818eb2bee6SAlexander V. Chernikov "require_diskspace": {"name": "require.diskspace"}, 828eb2bee6SAlexander V. Chernikov "require_files": {"name": "require.files", "fmt": wj_func}, 838eb2bee6SAlexander V. Chernikov "require_machine": {"name": "require.machine", "fmt": wj_func}, 848eb2bee6SAlexander V. Chernikov "require_memory": {"name": "require.memory"}, 858eb2bee6SAlexander V. Chernikov "require_progs": {"name": "require.progs", "fmt": wj_func}, 868eb2bee6SAlexander V. Chernikov "timeout": {}, 878eb2bee6SAlexander V. Chernikov } 888eb2bee6SAlexander V. Chernikov ret = {} 898eb2bee6SAlexander V. Chernikov for mark in obj.iter_markers(): 908eb2bee6SAlexander V. Chernikov if mark.name in _map: 916332ef89SAlexander V. Chernikov if "handler" in _map[mark.name]: 926332ef89SAlexander V. Chernikov _map[mark.name]["handler"](mark, obj, ret) 936332ef89SAlexander V. Chernikov continue 948eb2bee6SAlexander V. Chernikov name = _map[mark.name].get("name", mark.name) 958eb2bee6SAlexander V. Chernikov if "fmt" in _map[mark.name]: 968eb2bee6SAlexander V. Chernikov val = _map[mark.name]["fmt"](mark.args[0]) 978eb2bee6SAlexander V. Chernikov else: 988eb2bee6SAlexander V. Chernikov val = mark.args[0] 998eb2bee6SAlexander V. Chernikov ret[name] = val 1008eb2bee6SAlexander V. Chernikov return ret 1018eb2bee6SAlexander V. Chernikov 1028eb2bee6SAlexander V. Chernikov def as_lines(self) -> List[str]: 1038eb2bee6SAlexander V. Chernikov """Output test definition in ATF-specific format""" 1048eb2bee6SAlexander V. Chernikov ret = [] 1058eb2bee6SAlexander V. Chernikov ret.append("ident: {}".format(self.ident)) 1068eb2bee6SAlexander V. Chernikov ret.append("descr: {}".format(self._get_test_description(self.obj))) 1078eb2bee6SAlexander V. Chernikov if self.has_cleanup: 1088eb2bee6SAlexander V. Chernikov ret.append("has.cleanup: true") 1098eb2bee6SAlexander V. Chernikov for key, value in self._convert_marks(self.obj).items(): 1108eb2bee6SAlexander V. Chernikov ret.append("{}: {}".format(key, value)) 1118eb2bee6SAlexander V. Chernikov return ret 1128eb2bee6SAlexander V. Chernikov 1138eb2bee6SAlexander V. Chernikov 1148eb2bee6SAlexander V. Chernikovclass ATFHandler(object): 1158eb2bee6SAlexander V. Chernikov class ReportState(NamedTuple): 1168eb2bee6SAlexander V. Chernikov state: str 1178eb2bee6SAlexander V. Chernikov reason: str 1188eb2bee6SAlexander V. Chernikov 1196332ef89SAlexander V. Chernikov def __init__(self, report_file_name: Optional[str]): 1208eb2bee6SAlexander V. Chernikov self._tests_state_map: Dict[str, ReportStatus] = {} 1216332ef89SAlexander V. Chernikov self._report_file_name = report_file_name 1226332ef89SAlexander V. Chernikov self._report_file_handle = None 1236332ef89SAlexander V. Chernikov 1246332ef89SAlexander V. Chernikov def setup_configure(self): 1256332ef89SAlexander V. Chernikov fname = self._report_file_name 1266332ef89SAlexander V. Chernikov if fname: 1276332ef89SAlexander V. Chernikov self._report_file_handle = open(fname, mode="w") 1286332ef89SAlexander V. Chernikov 1296332ef89SAlexander V. Chernikov def setup_method_pre(self, item): 1306332ef89SAlexander V. Chernikov """Called before actually running the test setup_method""" 1316332ef89SAlexander V. Chernikov # Check if we need to manually drop the privileges 1326332ef89SAlexander V. Chernikov for mark in item.iter_markers(): 1336332ef89SAlexander V. Chernikov if mark.name == "require_user": 1346332ef89SAlexander V. Chernikov cls = self.get_test_class(item) 1356332ef89SAlexander V. Chernikov cls.TARGET_USER = mark.args[0] 1366332ef89SAlexander V. Chernikov break 1378eb2bee6SAlexander V. Chernikov 1388eb2bee6SAlexander V. Chernikov def override_runtest(self, obj): 1398eb2bee6SAlexander V. Chernikov # Override basic runtest command 1408eb2bee6SAlexander V. Chernikov obj.runtest = types.MethodType(ATFCleanupItem.runtest, obj) 1418eb2bee6SAlexander V. Chernikov # Override class setup/teardown 1428eb2bee6SAlexander V. Chernikov obj.parent.cls.setup_method = ATFCleanupItem.setup_method_noop 1438eb2bee6SAlexander V. Chernikov obj.parent.cls.teardown_method = ATFCleanupItem.teardown_method_noop 1448eb2bee6SAlexander V. Chernikov 14589ffac3bSAlexander V. Chernikov @staticmethod 14689ffac3bSAlexander V. Chernikov def get_test_class(obj): 1478eb2bee6SAlexander V. Chernikov if hasattr(obj, "parent") and obj.parent is not None: 14889ffac3bSAlexander V. Chernikov if hasattr(obj.parent, "cls"): 1498eb2bee6SAlexander V. Chernikov return obj.parent.cls 1508eb2bee6SAlexander V. Chernikov 1518eb2bee6SAlexander V. Chernikov def has_object_cleanup(self, obj): 15289ffac3bSAlexander V. Chernikov cls = self.get_test_class(obj) 15389ffac3bSAlexander V. Chernikov if cls is not None: 15489ffac3bSAlexander V. Chernikov method_name = nodeid_to_method_name(obj.nodeid) 15589ffac3bSAlexander V. Chernikov cleanup_name = "cleanup_{}".format(method_name) 15689ffac3bSAlexander V. Chernikov if hasattr(cls, "cleanup") or hasattr(cls, cleanup_name): 15789ffac3bSAlexander V. Chernikov return True 15889ffac3bSAlexander V. Chernikov return False 1598eb2bee6SAlexander V. Chernikov 160d9af4219SAlexander V. Chernikov def _generate_test_cleanups(self, items): 161d9af4219SAlexander V. Chernikov new_items = [] 162d9af4219SAlexander V. Chernikov for obj in items: 163d9af4219SAlexander V. Chernikov if self.has_object_cleanup(obj): 164d9af4219SAlexander V. Chernikov self.override_runtest(obj) 165d9af4219SAlexander V. Chernikov new_items.append(obj) 166d9af4219SAlexander V. Chernikov items.clear() 167d9af4219SAlexander V. Chernikov items.extend(new_items) 168d9af4219SAlexander V. Chernikov 169*3e5d0784SAlexander V. Chernikov def expand_tests(self, collector, name, obj): 170*3e5d0784SAlexander V. Chernikov return generate_ktests(collector, name, obj) 171*3e5d0784SAlexander V. Chernikov 172d9af4219SAlexander V. Chernikov def modify_tests(self, items, config): 173d9af4219SAlexander V. Chernikov if config.option.atf_cleanup: 174d9af4219SAlexander V. Chernikov self._generate_test_cleanups(items) 175d9af4219SAlexander V. Chernikov 1768eb2bee6SAlexander V. Chernikov def list_tests(self, tests: List[str]): 1778eb2bee6SAlexander V. Chernikov print('Content-Type: application/X-atf-tp; version="1"') 1788eb2bee6SAlexander V. Chernikov print() 1798eb2bee6SAlexander V. Chernikov for test_obj in tests: 1808eb2bee6SAlexander V. Chernikov has_cleanup = self.has_object_cleanup(test_obj) 1818eb2bee6SAlexander V. Chernikov atf_test = ATFTestObj(test_obj, has_cleanup) 1828eb2bee6SAlexander V. Chernikov for line in atf_test.as_lines(): 1838eb2bee6SAlexander V. Chernikov print(line) 1848eb2bee6SAlexander V. Chernikov print() 1858eb2bee6SAlexander V. Chernikov 1868eb2bee6SAlexander V. Chernikov def set_report_state(self, test_name: str, state: str, reason: str): 1878eb2bee6SAlexander V. Chernikov self._tests_state_map[test_name] = self.ReportState(state, reason) 1888eb2bee6SAlexander V. Chernikov 1898eb2bee6SAlexander V. Chernikov def _extract_report_reason(self, report): 1908eb2bee6SAlexander V. Chernikov data = report.longrepr 1918eb2bee6SAlexander V. Chernikov if data is None: 1928eb2bee6SAlexander V. Chernikov return None 1938eb2bee6SAlexander V. Chernikov if isinstance(data, Tuple): 1948eb2bee6SAlexander V. Chernikov # ('/path/to/test.py', 23, 'Skipped: unable to test') 1958eb2bee6SAlexander V. Chernikov reason = data[2] 1968eb2bee6SAlexander V. Chernikov for prefix in "Skipped: ": 1978eb2bee6SAlexander V. Chernikov if reason.startswith(prefix): 1988eb2bee6SAlexander V. Chernikov reason = reason[len(prefix):] 1998eb2bee6SAlexander V. Chernikov return reason 2008eb2bee6SAlexander V. Chernikov else: 2018eb2bee6SAlexander V. Chernikov # string/ traceback / exception report. Capture the last line 2028eb2bee6SAlexander V. Chernikov return str(data).split("\n")[-1] 2038eb2bee6SAlexander V. Chernikov return None 2048eb2bee6SAlexander V. Chernikov 2058eb2bee6SAlexander V. Chernikov def add_report(self, report): 2068eb2bee6SAlexander V. Chernikov # MAP pytest report state to the atf-desired state 2078eb2bee6SAlexander V. Chernikov # 2088eb2bee6SAlexander V. Chernikov # ATF test states: 2098eb2bee6SAlexander V. Chernikov # (1) expected_death, (2) expected_exit, (3) expected_failure 2108eb2bee6SAlexander V. Chernikov # (4) expected_signal, (5) expected_timeout, (6) passed 2118eb2bee6SAlexander V. Chernikov # (7) skipped, (8) failed 2128eb2bee6SAlexander V. Chernikov # 2138eb2bee6SAlexander V. Chernikov # Note that ATF don't have the concept of "soft xfail" - xpass 2148eb2bee6SAlexander V. Chernikov # is a failure. It also calls teardown routine in a separate 2158eb2bee6SAlexander V. Chernikov # process, thus teardown states (pytest-only) are handled as 2168eb2bee6SAlexander V. Chernikov # body continuation. 2178eb2bee6SAlexander V. Chernikov 2188eb2bee6SAlexander V. Chernikov # (stage, state, wasxfail) 2198eb2bee6SAlexander V. Chernikov 2208eb2bee6SAlexander V. Chernikov # Just a passing test: WANT: passed 2218eb2bee6SAlexander V. Chernikov # GOT: (setup, passed, F), (call, passed, F), (teardown, passed, F) 2228eb2bee6SAlexander V. Chernikov # 2238eb2bee6SAlexander V. Chernikov # Failing body test: WHAT: failed 2248eb2bee6SAlexander V. Chernikov # GOT: (setup, passed, F), (call, failed, F), (teardown, passed, F) 2258eb2bee6SAlexander V. Chernikov # 2268eb2bee6SAlexander V. Chernikov # pytest.skip test decorator: WANT: skipped 2278eb2bee6SAlexander V. Chernikov # GOT: (setup,skipped, False), (teardown, passed, False) 2288eb2bee6SAlexander V. Chernikov # 2298eb2bee6SAlexander V. Chernikov # pytest.skip call inside test function: WANT: skipped 2308eb2bee6SAlexander V. Chernikov # GOT: (setup, passed, F), (call, skipped, F), (teardown,passed, F) 2318eb2bee6SAlexander V. Chernikov # 2328eb2bee6SAlexander V. Chernikov # mark.xfail decorator+pytest.xfail: WANT: expected_failure 2338eb2bee6SAlexander V. Chernikov # GOT: (setup, passed, F), (call, skipped, T), (teardown, passed, F) 2348eb2bee6SAlexander V. Chernikov # 2358eb2bee6SAlexander V. Chernikov # mark.xfail decorator+pass: WANT: failed 2368eb2bee6SAlexander V. Chernikov # GOT: (setup, passed, F), (call, passed, T), (teardown, passed, F) 2378eb2bee6SAlexander V. Chernikov 2388eb2bee6SAlexander V. Chernikov test_name = report.location[2] 2398eb2bee6SAlexander V. Chernikov stage = report.when 2408eb2bee6SAlexander V. Chernikov state = report.outcome 2418eb2bee6SAlexander V. Chernikov reason = self._extract_report_reason(report) 2428eb2bee6SAlexander V. Chernikov 2438eb2bee6SAlexander V. Chernikov # We don't care about strict xfail - it gets translated to False 2448eb2bee6SAlexander V. Chernikov 2458eb2bee6SAlexander V. Chernikov if stage == "setup": 2468eb2bee6SAlexander V. Chernikov if state in ("skipped", "failed"): 2478eb2bee6SAlexander V. Chernikov # failed init -> failed test, skipped setup -> xskip 2488eb2bee6SAlexander V. Chernikov # for the whole test 2498eb2bee6SAlexander V. Chernikov self.set_report_state(test_name, state, reason) 2508eb2bee6SAlexander V. Chernikov elif stage == "call": 2518eb2bee6SAlexander V. Chernikov # "call" stage shouldn't matter if setup failed 2528eb2bee6SAlexander V. Chernikov if test_name in self._tests_state_map: 2538eb2bee6SAlexander V. Chernikov if self._tests_state_map[test_name].state == "failed": 2548eb2bee6SAlexander V. Chernikov return 2558eb2bee6SAlexander V. Chernikov if state == "failed": 2568eb2bee6SAlexander V. Chernikov # Record failure & override "skipped" state 2578eb2bee6SAlexander V. Chernikov self.set_report_state(test_name, state, reason) 2588eb2bee6SAlexander V. Chernikov elif state == "skipped": 2598eb2bee6SAlexander V. Chernikov if hasattr(reason, "wasxfail"): 2608eb2bee6SAlexander V. Chernikov # xfail() called in the test body 2618eb2bee6SAlexander V. Chernikov state = "expected_failure" 2628eb2bee6SAlexander V. Chernikov else: 2638eb2bee6SAlexander V. Chernikov # skip inside the body 2648eb2bee6SAlexander V. Chernikov pass 2658eb2bee6SAlexander V. Chernikov self.set_report_state(test_name, state, reason) 2668eb2bee6SAlexander V. Chernikov elif state == "passed": 2678eb2bee6SAlexander V. Chernikov if hasattr(reason, "wasxfail"): 2688eb2bee6SAlexander V. Chernikov # the test was expected to fail but didn't 2698eb2bee6SAlexander V. Chernikov # mark as hard failure 2708eb2bee6SAlexander V. Chernikov state = "failed" 2718eb2bee6SAlexander V. Chernikov self.set_report_state(test_name, state, reason) 2728eb2bee6SAlexander V. Chernikov elif stage == "teardown": 2738eb2bee6SAlexander V. Chernikov if state == "failed": 2748eb2bee6SAlexander V. Chernikov # teardown should be empty, as the cleanup 2758eb2bee6SAlexander V. Chernikov # procedures should be implemented as a separate 2768eb2bee6SAlexander V. Chernikov # function/method, so mark teardown failure as 2778eb2bee6SAlexander V. Chernikov # global failure 2788eb2bee6SAlexander V. Chernikov self.set_report_state(test_name, state, reason) 2798eb2bee6SAlexander V. Chernikov 2806332ef89SAlexander V. Chernikov def write_report(self): 2816332ef89SAlexander V. Chernikov if self._report_file_handle is None: 2826332ef89SAlexander V. Chernikov return 2838eb2bee6SAlexander V. Chernikov if self._tests_state_map: 2848eb2bee6SAlexander V. Chernikov # If we're executing in ATF mode, there has to be just one test 2858eb2bee6SAlexander V. Chernikov # Anyway, deterministically pick the first one 2868eb2bee6SAlexander V. Chernikov first_test_name = next(iter(self._tests_state_map)) 2878eb2bee6SAlexander V. Chernikov test = self._tests_state_map[first_test_name] 2888eb2bee6SAlexander V. Chernikov if test.state == "passed": 2898eb2bee6SAlexander V. Chernikov line = test.state 2908eb2bee6SAlexander V. Chernikov else: 2918eb2bee6SAlexander V. Chernikov line = "{}: {}".format(test.state, test.reason) 2926332ef89SAlexander V. Chernikov print(line, file=self._report_file_handle) 2936332ef89SAlexander V. Chernikov self._report_file_handle.close() 294513ce835SAlexander V. Chernikov 295513ce835SAlexander V. Chernikov @staticmethod 296513ce835SAlexander V. Chernikov def get_atf_vars() -> Dict[str, str]: 297513ce835SAlexander V. Chernikov px = "_ATF_VAR_" 298513ce835SAlexander V. Chernikov return {k[len(px):]: v for k, v in os.environ.items() if k.startswith(px)} 299