1*3e5d0784SAlexander V. Chernikovimport logging 2*3e5d0784SAlexander V. Chernikovimport time 3*3e5d0784SAlexander V. Chernikovfrom typing import NamedTuple 4*3e5d0784SAlexander V. Chernikov 5*3e5d0784SAlexander V. Chernikovimport pytest 6*3e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.attrs import NlAttrNested 7*3e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.attrs import NlAttrStr 8*3e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink import NetlinkMultipartIterator 9*3e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink import NlHelper 10*3e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink import Nlsock 11*3e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_generic import KtestAttrType 12*3e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_generic import KtestInfoMessage 13*3e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_generic import KtestLogMsgType 14*3e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_generic import KtestMsgAttrType 15*3e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_generic import KtestMsgType 16*3e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_generic import timespec 17*3e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.utils import NlConst 18*3e5d0784SAlexander V. Chernikovfrom atf_python.utils import BaseTest 19*3e5d0784SAlexander V. Chernikovfrom atf_python.utils import libc 20*3e5d0784SAlexander V. Chernikovfrom atf_python.utils import nodeid_to_method_name 21*3e5d0784SAlexander V. Chernikov 22*3e5d0784SAlexander V. Chernikov 23*3e5d0784SAlexander V. Chernikovdatefmt = "%H:%M:%S" 24*3e5d0784SAlexander V. Chernikovfmt = "%(asctime)s.%(msecs)03d %(filename)s:%(funcName)s:%(lineno)d %(message)s" 25*3e5d0784SAlexander V. Chernikovlogging.basicConfig(level=logging.DEBUG, format=fmt, datefmt=datefmt) 26*3e5d0784SAlexander V. Chernikovlogger = logging.getLogger("ktest") 27*3e5d0784SAlexander V. Chernikov 28*3e5d0784SAlexander V. Chernikov 29*3e5d0784SAlexander V. ChernikovNETLINK_FAMILY = "ktest" 30*3e5d0784SAlexander V. Chernikov 31*3e5d0784SAlexander V. Chernikov 32*3e5d0784SAlexander V. Chernikovclass KtestItem(pytest.Item): 33*3e5d0784SAlexander V. Chernikov def __init__(self, *, descr, kcls, **kwargs): 34*3e5d0784SAlexander V. Chernikov super().__init__(**kwargs) 35*3e5d0784SAlexander V. Chernikov self.descr = descr 36*3e5d0784SAlexander V. Chernikov self._kcls = kcls 37*3e5d0784SAlexander V. Chernikov 38*3e5d0784SAlexander V. Chernikov def runtest(self): 39*3e5d0784SAlexander V. Chernikov self._kcls().runtest() 40*3e5d0784SAlexander V. Chernikov 41*3e5d0784SAlexander V. Chernikov 42*3e5d0784SAlexander V. Chernikovclass KtestCollector(pytest.Class): 43*3e5d0784SAlexander V. Chernikov def collect(self): 44*3e5d0784SAlexander V. Chernikov obj = self.obj 45*3e5d0784SAlexander V. Chernikov exclude_names = set([n for n in dir(obj) if not n.startswith("_")]) 46*3e5d0784SAlexander V. Chernikov 47*3e5d0784SAlexander V. Chernikov autoload = obj.KTEST_MODULE_AUTOLOAD 48*3e5d0784SAlexander V. Chernikov module_name = obj.KTEST_MODULE_NAME 49*3e5d0784SAlexander V. Chernikov loader = KtestLoader(module_name, autoload) 50*3e5d0784SAlexander V. Chernikov ktests = loader.load_ktests() 51*3e5d0784SAlexander V. Chernikov if not ktests: 52*3e5d0784SAlexander V. Chernikov return 53*3e5d0784SAlexander V. Chernikov 54*3e5d0784SAlexander V. Chernikov orig = pytest.Class.from_parent(self.parent, name=self.name, obj=obj) 55*3e5d0784SAlexander V. Chernikov for py_test in orig.collect(): 56*3e5d0784SAlexander V. Chernikov yield py_test 57*3e5d0784SAlexander V. Chernikov 58*3e5d0784SAlexander V. Chernikov for ktest in ktests: 59*3e5d0784SAlexander V. Chernikov name = ktest["name"] 60*3e5d0784SAlexander V. Chernikov descr = ktest["desc"] 61*3e5d0784SAlexander V. Chernikov if name in exclude_names: 62*3e5d0784SAlexander V. Chernikov continue 63*3e5d0784SAlexander V. Chernikov yield KtestItem.from_parent(self, name=name, descr=descr, kcls=obj) 64*3e5d0784SAlexander V. Chernikov 65*3e5d0784SAlexander V. Chernikov 66*3e5d0784SAlexander V. Chernikovclass KtestLoader(object): 67*3e5d0784SAlexander V. Chernikov def __init__(self, module_name: str, autoload: bool): 68*3e5d0784SAlexander V. Chernikov self.module_name = module_name 69*3e5d0784SAlexander V. Chernikov self.autoload = autoload 70*3e5d0784SAlexander V. Chernikov self.helper = NlHelper() 71*3e5d0784SAlexander V. Chernikov self.nlsock = Nlsock(NlConst.NETLINK_GENERIC, self.helper) 72*3e5d0784SAlexander V. Chernikov self.family_id = self._get_family_id() 73*3e5d0784SAlexander V. Chernikov 74*3e5d0784SAlexander V. Chernikov def _get_family_id(self): 75*3e5d0784SAlexander V. Chernikov try: 76*3e5d0784SAlexander V. Chernikov family_id = self.nlsock.get_genl_family_id(NETLINK_FAMILY) 77*3e5d0784SAlexander V. Chernikov except ValueError: 78*3e5d0784SAlexander V. Chernikov if self.autoload: 79*3e5d0784SAlexander V. Chernikov libc.kldload(self.module_name) 80*3e5d0784SAlexander V. Chernikov family_id = self.nlsock.get_genl_family_id(NETLINK_FAMILY) 81*3e5d0784SAlexander V. Chernikov else: 82*3e5d0784SAlexander V. Chernikov raise 83*3e5d0784SAlexander V. Chernikov return family_id 84*3e5d0784SAlexander V. Chernikov 85*3e5d0784SAlexander V. Chernikov def _load_ktests(self): 86*3e5d0784SAlexander V. Chernikov msg = KtestInfoMessage(self.helper, self.family_id, KtestMsgType.KTEST_CMD_LIST) 87*3e5d0784SAlexander V. Chernikov msg.set_request() 88*3e5d0784SAlexander V. Chernikov msg.add_nla(NlAttrStr(KtestAttrType.KTEST_ATTR_MOD_NAME, self.module_name)) 89*3e5d0784SAlexander V. Chernikov self.nlsock.write_message(msg, verbose=False) 90*3e5d0784SAlexander V. Chernikov nlmsg_seq = msg.nl_hdr.nlmsg_seq 91*3e5d0784SAlexander V. Chernikov 92*3e5d0784SAlexander V. Chernikov ret = [] 93*3e5d0784SAlexander V. Chernikov for rx_msg in NetlinkMultipartIterator(self.nlsock, nlmsg_seq, self.family_id): 94*3e5d0784SAlexander V. Chernikov # test_msg.print_message() 95*3e5d0784SAlexander V. Chernikov tst = { 96*3e5d0784SAlexander V. Chernikov "mod_name": rx_msg.get_nla(KtestAttrType.KTEST_ATTR_MOD_NAME).text, 97*3e5d0784SAlexander V. Chernikov "name": rx_msg.get_nla(KtestAttrType.KTEST_ATTR_TEST_NAME).text, 98*3e5d0784SAlexander V. Chernikov "desc": rx_msg.get_nla(KtestAttrType.KTEST_ATTR_TEST_DESCR).text, 99*3e5d0784SAlexander V. Chernikov } 100*3e5d0784SAlexander V. Chernikov ret.append(tst) 101*3e5d0784SAlexander V. Chernikov return ret 102*3e5d0784SAlexander V. Chernikov 103*3e5d0784SAlexander V. Chernikov def load_ktests(self): 104*3e5d0784SAlexander V. Chernikov ret = self._load_ktests() 105*3e5d0784SAlexander V. Chernikov if not ret and self.autoload: 106*3e5d0784SAlexander V. Chernikov libc.kldload(self.module_name) 107*3e5d0784SAlexander V. Chernikov ret = self._load_ktests() 108*3e5d0784SAlexander V. Chernikov return ret 109*3e5d0784SAlexander V. Chernikov 110*3e5d0784SAlexander V. Chernikov 111*3e5d0784SAlexander V. Chernikovdef generate_ktests(collector, name, obj): 112*3e5d0784SAlexander V. Chernikov if getattr(obj, "KTEST_MODULE_NAME", None) is not None: 113*3e5d0784SAlexander V. Chernikov return KtestCollector.from_parent(collector, name=name, obj=obj) 114*3e5d0784SAlexander V. Chernikov return None 115*3e5d0784SAlexander V. Chernikov 116*3e5d0784SAlexander V. Chernikov 117*3e5d0784SAlexander V. Chernikovclass BaseKernelTest(BaseTest): 118*3e5d0784SAlexander V. Chernikov KTEST_MODULE_AUTOLOAD = True 119*3e5d0784SAlexander V. Chernikov KTEST_MODULE_NAME = None 120*3e5d0784SAlexander V. Chernikov 121*3e5d0784SAlexander V. Chernikov def _get_record_time(self, msg) -> float: 122*3e5d0784SAlexander V. Chernikov timespec = msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_TS).ts 123*3e5d0784SAlexander V. Chernikov epoch_ktime = timespec.tv_sec * 1.0 + timespec.tv_nsec * 1.0 / 1000000000 124*3e5d0784SAlexander V. Chernikov if not hasattr(self, "_start_epoch"): 125*3e5d0784SAlexander V. Chernikov self._start_ktime = epoch_ktime 126*3e5d0784SAlexander V. Chernikov self._start_time = time.time() 127*3e5d0784SAlexander V. Chernikov epoch_time = self._start_time 128*3e5d0784SAlexander V. Chernikov else: 129*3e5d0784SAlexander V. Chernikov epoch_time = time.time() - self._start_time + epoch_ktime 130*3e5d0784SAlexander V. Chernikov return epoch_time 131*3e5d0784SAlexander V. Chernikov 132*3e5d0784SAlexander V. Chernikov def _log_message(self, msg): 133*3e5d0784SAlexander V. Chernikov # Convert syslog-type l 134*3e5d0784SAlexander V. Chernikov syslog_level = msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_LEVEL).u8 135*3e5d0784SAlexander V. Chernikov if syslog_level <= 6: 136*3e5d0784SAlexander V. Chernikov loglevel = logging.INFO 137*3e5d0784SAlexander V. Chernikov else: 138*3e5d0784SAlexander V. Chernikov loglevel = logging.DEBUG 139*3e5d0784SAlexander V. Chernikov rec = logging.LogRecord( 140*3e5d0784SAlexander V. Chernikov self.KTEST_MODULE_NAME, 141*3e5d0784SAlexander V. Chernikov loglevel, 142*3e5d0784SAlexander V. Chernikov msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_FILE).text, 143*3e5d0784SAlexander V. Chernikov msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_LINE).u32, 144*3e5d0784SAlexander V. Chernikov "%s", 145*3e5d0784SAlexander V. Chernikov (msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_TEXT).text), 146*3e5d0784SAlexander V. Chernikov None, 147*3e5d0784SAlexander V. Chernikov msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_FUNC).text, 148*3e5d0784SAlexander V. Chernikov None, 149*3e5d0784SAlexander V. Chernikov ) 150*3e5d0784SAlexander V. Chernikov rec.created = self._get_record_time(msg) 151*3e5d0784SAlexander V. Chernikov logger.handle(rec) 152*3e5d0784SAlexander V. Chernikov 153*3e5d0784SAlexander V. Chernikov def _runtest_name(self, test_name: str, test_data): 154*3e5d0784SAlexander V. Chernikov module_name = self.KTEST_MODULE_NAME 155*3e5d0784SAlexander V. Chernikov # print("Running kernel test {} for module {}".format(test_name, module_name)) 156*3e5d0784SAlexander V. Chernikov helper = NlHelper() 157*3e5d0784SAlexander V. Chernikov nlsock = Nlsock(NlConst.NETLINK_GENERIC, helper) 158*3e5d0784SAlexander V. Chernikov family_id = nlsock.get_genl_family_id(NETLINK_FAMILY) 159*3e5d0784SAlexander V. Chernikov msg = KtestInfoMessage(helper, family_id, KtestMsgType.KTEST_CMD_RUN) 160*3e5d0784SAlexander V. Chernikov msg.set_request() 161*3e5d0784SAlexander V. Chernikov msg.add_nla(NlAttrStr(KtestAttrType.KTEST_ATTR_MOD_NAME, module_name)) 162*3e5d0784SAlexander V. Chernikov msg.add_nla(NlAttrStr(KtestAttrType.KTEST_ATTR_TEST_NAME, test_name)) 163*3e5d0784SAlexander V. Chernikov if test_data is not None: 164*3e5d0784SAlexander V. Chernikov msg.add_nla(NlAttrNested(KtestAttrType.KTEST_ATTR_TEST_META, test_data)) 165*3e5d0784SAlexander V. Chernikov nlsock.write_message(msg, verbose=False) 166*3e5d0784SAlexander V. Chernikov 167*3e5d0784SAlexander V. Chernikov for log_msg in NetlinkMultipartIterator( 168*3e5d0784SAlexander V. Chernikov nlsock, msg.nl_hdr.nlmsg_seq, family_id 169*3e5d0784SAlexander V. Chernikov ): 170*3e5d0784SAlexander V. Chernikov self._log_message(log_msg) 171*3e5d0784SAlexander V. Chernikov 172*3e5d0784SAlexander V. Chernikov def runtest(self, test_data=None): 173*3e5d0784SAlexander V. Chernikov self._runtest_name(nodeid_to_method_name(self.test_id), test_data) 174