13e5d0784SAlexander V. Chernikovimport logging 23e5d0784SAlexander V. Chernikovimport time 33e5d0784SAlexander V. Chernikovfrom typing import NamedTuple 43e5d0784SAlexander V. Chernikov 53e5d0784SAlexander V. Chernikovimport pytest 63e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.attrs import NlAttrNested 73e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.attrs import NlAttrStr 83e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink import NetlinkMultipartIterator 93e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink import NlHelper 103e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink import Nlsock 113e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_generic import KtestAttrType 123e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_generic import KtestInfoMessage 133e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_generic import KtestLogMsgType 143e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_generic import KtestMsgAttrType 153e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_generic import KtestMsgType 163e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_generic import timespec 173e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.utils import NlConst 183e5d0784SAlexander V. Chernikovfrom atf_python.utils import BaseTest 193e5d0784SAlexander V. Chernikovfrom atf_python.utils import libc 203e5d0784SAlexander V. Chernikovfrom atf_python.utils import nodeid_to_method_name 213e5d0784SAlexander V. Chernikov 223e5d0784SAlexander V. Chernikov 233e5d0784SAlexander V. Chernikovdatefmt = "%H:%M:%S" 243e5d0784SAlexander V. Chernikovfmt = "%(asctime)s.%(msecs)03d %(filename)s:%(funcName)s:%(lineno)d %(message)s" 253e5d0784SAlexander V. Chernikovlogging.basicConfig(level=logging.DEBUG, format=fmt, datefmt=datefmt) 263e5d0784SAlexander V. Chernikovlogger = logging.getLogger("ktest") 273e5d0784SAlexander V. Chernikov 283e5d0784SAlexander V. Chernikov 293e5d0784SAlexander V. ChernikovNETLINK_FAMILY = "ktest" 303e5d0784SAlexander V. Chernikov 313e5d0784SAlexander V. Chernikov 323e5d0784SAlexander V. Chernikovclass KtestItem(pytest.Item): 333e5d0784SAlexander V. Chernikov def __init__(self, *, descr, kcls, **kwargs): 343e5d0784SAlexander V. Chernikov super().__init__(**kwargs) 353e5d0784SAlexander V. Chernikov self.descr = descr 363e5d0784SAlexander V. Chernikov self._kcls = kcls 373e5d0784SAlexander V. Chernikov 383e5d0784SAlexander V. Chernikov def runtest(self): 393e5d0784SAlexander V. Chernikov self._kcls().runtest() 403e5d0784SAlexander V. Chernikov 413e5d0784SAlexander V. Chernikov 423e5d0784SAlexander V. Chernikovclass KtestCollector(pytest.Class): 433e5d0784SAlexander V. Chernikov def collect(self): 443e5d0784SAlexander V. Chernikov obj = self.obj 453e5d0784SAlexander V. Chernikov exclude_names = set([n for n in dir(obj) if not n.startswith("_")]) 463e5d0784SAlexander V. Chernikov 473e5d0784SAlexander V. Chernikov autoload = obj.KTEST_MODULE_AUTOLOAD 483e5d0784SAlexander V. Chernikov module_name = obj.KTEST_MODULE_NAME 493e5d0784SAlexander V. Chernikov loader = KtestLoader(module_name, autoload) 503e5d0784SAlexander V. Chernikov ktests = loader.load_ktests() 513e5d0784SAlexander V. Chernikov if not ktests: 523e5d0784SAlexander V. Chernikov return 533e5d0784SAlexander V. Chernikov 543e5d0784SAlexander V. Chernikov orig = pytest.Class.from_parent(self.parent, name=self.name, obj=obj) 553e5d0784SAlexander V. Chernikov for py_test in orig.collect(): 563e5d0784SAlexander V. Chernikov yield py_test 573e5d0784SAlexander V. Chernikov 583e5d0784SAlexander V. Chernikov for ktest in ktests: 593e5d0784SAlexander V. Chernikov name = ktest["name"] 603e5d0784SAlexander V. Chernikov descr = ktest["desc"] 613e5d0784SAlexander V. Chernikov if name in exclude_names: 623e5d0784SAlexander V. Chernikov continue 633e5d0784SAlexander V. Chernikov yield KtestItem.from_parent(self, name=name, descr=descr, kcls=obj) 643e5d0784SAlexander V. Chernikov 653e5d0784SAlexander V. Chernikov 663e5d0784SAlexander V. Chernikovclass KtestLoader(object): 673e5d0784SAlexander V. Chernikov def __init__(self, module_name: str, autoload: bool): 683e5d0784SAlexander V. Chernikov self.module_name = module_name 693e5d0784SAlexander V. Chernikov self.autoload = autoload 703e5d0784SAlexander V. Chernikov self.helper = NlHelper() 713e5d0784SAlexander V. Chernikov self.nlsock = Nlsock(NlConst.NETLINK_GENERIC, self.helper) 723e5d0784SAlexander V. Chernikov self.family_id = self._get_family_id() 733e5d0784SAlexander V. Chernikov 743e5d0784SAlexander V. Chernikov def _get_family_id(self): 753e5d0784SAlexander V. Chernikov try: 763e5d0784SAlexander V. Chernikov family_id = self.nlsock.get_genl_family_id(NETLINK_FAMILY) 773e5d0784SAlexander V. Chernikov except ValueError: 783e5d0784SAlexander V. Chernikov if self.autoload: 793e5d0784SAlexander V. Chernikov libc.kldload(self.module_name) 803e5d0784SAlexander V. Chernikov family_id = self.nlsock.get_genl_family_id(NETLINK_FAMILY) 813e5d0784SAlexander V. Chernikov else: 823e5d0784SAlexander V. Chernikov raise 833e5d0784SAlexander V. Chernikov return family_id 843e5d0784SAlexander V. Chernikov 853e5d0784SAlexander V. Chernikov def _load_ktests(self): 863e5d0784SAlexander V. Chernikov msg = KtestInfoMessage(self.helper, self.family_id, KtestMsgType.KTEST_CMD_LIST) 873e5d0784SAlexander V. Chernikov msg.set_request() 883e5d0784SAlexander V. Chernikov msg.add_nla(NlAttrStr(KtestAttrType.KTEST_ATTR_MOD_NAME, self.module_name)) 893e5d0784SAlexander V. Chernikov self.nlsock.write_message(msg, verbose=False) 903e5d0784SAlexander V. Chernikov nlmsg_seq = msg.nl_hdr.nlmsg_seq 913e5d0784SAlexander V. Chernikov 923e5d0784SAlexander V. Chernikov ret = [] 933e5d0784SAlexander V. Chernikov for rx_msg in NetlinkMultipartIterator(self.nlsock, nlmsg_seq, self.family_id): 94*0eb0d233SAlexander V. Chernikov # rx_msg.print_message() 953e5d0784SAlexander V. Chernikov tst = { 963e5d0784SAlexander V. Chernikov "mod_name": rx_msg.get_nla(KtestAttrType.KTEST_ATTR_MOD_NAME).text, 973e5d0784SAlexander V. Chernikov "name": rx_msg.get_nla(KtestAttrType.KTEST_ATTR_TEST_NAME).text, 983e5d0784SAlexander V. Chernikov "desc": rx_msg.get_nla(KtestAttrType.KTEST_ATTR_TEST_DESCR).text, 993e5d0784SAlexander V. Chernikov } 1003e5d0784SAlexander V. Chernikov ret.append(tst) 1013e5d0784SAlexander V. Chernikov return ret 1023e5d0784SAlexander V. Chernikov 1033e5d0784SAlexander V. Chernikov def load_ktests(self): 1043e5d0784SAlexander V. Chernikov ret = self._load_ktests() 1053e5d0784SAlexander V. Chernikov if not ret and self.autoload: 1063e5d0784SAlexander V. Chernikov libc.kldload(self.module_name) 1073e5d0784SAlexander V. Chernikov ret = self._load_ktests() 1083e5d0784SAlexander V. Chernikov return ret 1093e5d0784SAlexander V. Chernikov 1103e5d0784SAlexander V. Chernikov 1113e5d0784SAlexander V. Chernikovdef generate_ktests(collector, name, obj): 1123e5d0784SAlexander V. Chernikov if getattr(obj, "KTEST_MODULE_NAME", None) is not None: 1133e5d0784SAlexander V. Chernikov return KtestCollector.from_parent(collector, name=name, obj=obj) 1143e5d0784SAlexander V. Chernikov return None 1153e5d0784SAlexander V. Chernikov 1163e5d0784SAlexander V. Chernikov 1173e5d0784SAlexander V. Chernikovclass BaseKernelTest(BaseTest): 1183e5d0784SAlexander V. Chernikov KTEST_MODULE_AUTOLOAD = True 1193e5d0784SAlexander V. Chernikov KTEST_MODULE_NAME = None 1203e5d0784SAlexander V. Chernikov 1213e5d0784SAlexander V. Chernikov def _get_record_time(self, msg) -> float: 1223e5d0784SAlexander V. Chernikov timespec = msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_TS).ts 1233e5d0784SAlexander V. Chernikov epoch_ktime = timespec.tv_sec * 1.0 + timespec.tv_nsec * 1.0 / 1000000000 1243e5d0784SAlexander V. Chernikov if not hasattr(self, "_start_epoch"): 1253e5d0784SAlexander V. Chernikov self._start_ktime = epoch_ktime 1263e5d0784SAlexander V. Chernikov self._start_time = time.time() 1273e5d0784SAlexander V. Chernikov epoch_time = self._start_time 1283e5d0784SAlexander V. Chernikov else: 1293e5d0784SAlexander V. Chernikov epoch_time = time.time() - self._start_time + epoch_ktime 1303e5d0784SAlexander V. Chernikov return epoch_time 1313e5d0784SAlexander V. Chernikov 1323e5d0784SAlexander V. Chernikov def _log_message(self, msg): 1333e5d0784SAlexander V. Chernikov # Convert syslog-type l 1343e5d0784SAlexander V. Chernikov syslog_level = msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_LEVEL).u8 1353e5d0784SAlexander V. Chernikov if syslog_level <= 6: 1363e5d0784SAlexander V. Chernikov loglevel = logging.INFO 1373e5d0784SAlexander V. Chernikov else: 1383e5d0784SAlexander V. Chernikov loglevel = logging.DEBUG 1393e5d0784SAlexander V. Chernikov rec = logging.LogRecord( 1403e5d0784SAlexander V. Chernikov self.KTEST_MODULE_NAME, 1413e5d0784SAlexander V. Chernikov loglevel, 1423e5d0784SAlexander V. Chernikov msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_FILE).text, 1433e5d0784SAlexander V. Chernikov msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_LINE).u32, 1443e5d0784SAlexander V. Chernikov "%s", 1453e5d0784SAlexander V. Chernikov (msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_TEXT).text), 1463e5d0784SAlexander V. Chernikov None, 1473e5d0784SAlexander V. Chernikov msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_FUNC).text, 1483e5d0784SAlexander V. Chernikov None, 1493e5d0784SAlexander V. Chernikov ) 1503e5d0784SAlexander V. Chernikov rec.created = self._get_record_time(msg) 1513e5d0784SAlexander V. Chernikov logger.handle(rec) 1523e5d0784SAlexander V. Chernikov 1533e5d0784SAlexander V. Chernikov def _runtest_name(self, test_name: str, test_data): 1543e5d0784SAlexander V. Chernikov module_name = self.KTEST_MODULE_NAME 1553e5d0784SAlexander V. Chernikov # print("Running kernel test {} for module {}".format(test_name, module_name)) 1563e5d0784SAlexander V. Chernikov helper = NlHelper() 1573e5d0784SAlexander V. Chernikov nlsock = Nlsock(NlConst.NETLINK_GENERIC, helper) 1583e5d0784SAlexander V. Chernikov family_id = nlsock.get_genl_family_id(NETLINK_FAMILY) 1593e5d0784SAlexander V. Chernikov msg = KtestInfoMessage(helper, family_id, KtestMsgType.KTEST_CMD_RUN) 1603e5d0784SAlexander V. Chernikov msg.set_request() 1613e5d0784SAlexander V. Chernikov msg.add_nla(NlAttrStr(KtestAttrType.KTEST_ATTR_MOD_NAME, module_name)) 1623e5d0784SAlexander V. Chernikov msg.add_nla(NlAttrStr(KtestAttrType.KTEST_ATTR_TEST_NAME, test_name)) 1633e5d0784SAlexander V. Chernikov if test_data is not None: 1643e5d0784SAlexander V. Chernikov msg.add_nla(NlAttrNested(KtestAttrType.KTEST_ATTR_TEST_META, test_data)) 1653e5d0784SAlexander V. Chernikov nlsock.write_message(msg, verbose=False) 1663e5d0784SAlexander V. Chernikov 1673e5d0784SAlexander V. Chernikov for log_msg in NetlinkMultipartIterator( 1683e5d0784SAlexander V. Chernikov nlsock, msg.nl_hdr.nlmsg_seq, family_id 1693e5d0784SAlexander V. Chernikov ): 1703e5d0784SAlexander V. Chernikov self._log_message(log_msg) 1713e5d0784SAlexander V. Chernikov 1723e5d0784SAlexander V. Chernikov def runtest(self, test_data=None): 1733e5d0784SAlexander V. Chernikov self._runtest_name(nodeid_to_method_name(self.test_id), test_data) 174