1#!/usr/local/bin/python3 2from ctypes import c_int64 3from ctypes import c_long 4from ctypes import sizeof 5from ctypes import Structure 6from enum import Enum 7import struct 8 9from atf_python.sys.netlink.attrs import NlAttr 10from atf_python.sys.netlink.attrs import NlAttrStr 11from atf_python.sys.netlink.attrs import NlAttrU16 12from atf_python.sys.netlink.attrs import NlAttrU32 13from atf_python.sys.netlink.attrs import NlAttrU8 14from atf_python.sys.netlink.base_headers import GenlMsgHdr 15from atf_python.sys.netlink.message import NlMsgCategory 16from atf_python.sys.netlink.message import NlMsgProps 17from atf_python.sys.netlink.message import StdNetlinkMessage 18from atf_python.sys.netlink.utils import AttrDescr 19from atf_python.sys.netlink.utils import prepare_attrs_map 20from atf_python.sys.netlink.utils import enum_or_int 21 22 23class NetlinkGenlMessage(StdNetlinkMessage): 24 messages = [] 25 nl_attrs_map = {} 26 family_name = None 27 28 def __init__(self, helper, family_id, cmd=0): 29 super().__init__(helper, family_id) 30 self.base_hdr = GenlMsgHdr(cmd=enum_or_int(cmd)) 31 32 def parse_base_header(self, data): 33 if len(data) < sizeof(GenlMsgHdr): 34 raise ValueError("length less than GenlMsgHdr header") 35 ghdr = GenlMsgHdr.from_buffer_copy(data) 36 return (ghdr, sizeof(GenlMsgHdr)) 37 38 def _get_msg_type(self): 39 return self.base_hdr.cmd 40 41 def print_nl_header(self, prepend=""): 42 # len=44, type=RTM_DELROUTE, flags=NLM_F_REQUEST|NLM_F_ACK, seq=1641163704, pid=0 # noqa: E501 43 hdr = self.nl_hdr 44 print( 45 "{}len={}, family={}, flags={}(0x{:X}), seq={}, pid={}".format( 46 prepend, 47 hdr.nlmsg_len, 48 self.family_name, 49 self.get_nlm_flags_str(), 50 hdr.nlmsg_flags, 51 hdr.nlmsg_seq, 52 hdr.nlmsg_pid, 53 ) 54 ) 55 56 def print_base_header(self, hdr, prepend=""): 57 print( 58 "{}cmd={} version={} reserved={}".format( 59 prepend, self.msg_name, hdr.version, hdr.reserved 60 ) 61 ) 62 63 64GenlCtrlFamilyName = "nlctrl" 65 66class GenlCtrlMsgType(Enum): 67 CTRL_CMD_UNSPEC = 0 68 CTRL_CMD_NEWFAMILY = 1 69 CTRL_CMD_DELFAMILY = 2 70 CTRL_CMD_GETFAMILY = 3 71 CTRL_CMD_NEWOPS = 4 72 CTRL_CMD_DELOPS = 5 73 CTRL_CMD_GETOPS = 6 74 CTRL_CMD_NEWMCAST_GRP = 7 75 CTRL_CMD_DELMCAST_GRP = 8 76 CTRL_CMD_GETMCAST_GRP = 9 77 CTRL_CMD_GETPOLICY = 10 78 79 80class GenlCtrlAttrType(Enum): 81 CTRL_ATTR_FAMILY_ID = 1 82 CTRL_ATTR_FAMILY_NAME = 2 83 CTRL_ATTR_VERSION = 3 84 CTRL_ATTR_HDRSIZE = 4 85 CTRL_ATTR_MAXATTR = 5 86 CTRL_ATTR_OPS = 6 87 CTRL_ATTR_MCAST_GROUPS = 7 88 CTRL_ATTR_POLICY = 8 89 CTRL_ATTR_OP_POLICY = 9 90 CTRL_ATTR_OP = 10 91 92 93genl_ctrl_attrs = prepare_attrs_map( 94 [ 95 AttrDescr(GenlCtrlAttrType.CTRL_ATTR_FAMILY_ID, NlAttrU16), 96 AttrDescr(GenlCtrlAttrType.CTRL_ATTR_FAMILY_NAME, NlAttrStr), 97 AttrDescr(GenlCtrlAttrType.CTRL_ATTR_VERSION, NlAttrU32), 98 AttrDescr(GenlCtrlAttrType.CTRL_ATTR_HDRSIZE, NlAttrU32), 99 AttrDescr(GenlCtrlAttrType.CTRL_ATTR_MAXATTR, NlAttrU32), 100 ] 101) 102 103 104class NetlinkGenlCtrlMessage(NetlinkGenlMessage): 105 messages = [ 106 NlMsgProps(GenlCtrlMsgType.CTRL_CMD_NEWFAMILY, NlMsgCategory.NEW), 107 NlMsgProps(GenlCtrlMsgType.CTRL_CMD_GETFAMILY, NlMsgCategory.GET), 108 NlMsgProps(GenlCtrlMsgType.CTRL_CMD_DELFAMILY, NlMsgCategory.DELETE), 109 ] 110 nl_attrs_map = genl_ctrl_attrs 111 family_name = GenlCtrlFamilyName 112 113 114KtestFamilyName = "ktest" 115 116 117class KtestMsgType(Enum): 118 KTEST_CMD_UNSPEC = 0 119 KTEST_CMD_LIST = 1 120 KTEST_CMD_RUN = 2 121 KTEST_CMD_NEWTEST = 3 122 KTEST_CMD_NEWMESSAGE = 4 123 124 125class KtestAttrType(Enum): 126 KTEST_ATTR_MOD_NAME = 1 127 KTEST_ATTR_TEST_NAME = 2 128 KTEST_ATTR_TEST_DESCR = 3 129 KTEST_ATTR_TEST_META = 4 130 131 132class KtestLogMsgType(Enum): 133 KTEST_MSG_START = 1 134 KTEST_MSG_END = 2 135 KTEST_MSG_LOG = 3 136 KTEST_MSG_FAIL = 4 137 138 139class KtestMsgAttrType(Enum): 140 KTEST_MSG_ATTR_TS = 1 141 KTEST_MSG_ATTR_FUNC = 2 142 KTEST_MSG_ATTR_FILE = 3 143 KTEST_MSG_ATTR_LINE = 4 144 KTEST_MSG_ATTR_TEXT = 5 145 KTEST_MSG_ATTR_LEVEL = 6 146 KTEST_MSG_ATTR_META = 7 147 148 149class timespec(Structure): 150 _fields_ = [ 151 ("tv_sec", c_int64), 152 ("tv_nsec", c_long), 153 ] 154 155 156class NlAttrTS(NlAttr): 157 DATA_LEN = sizeof(timespec) 158 159 def __init__(self, nla_type, val): 160 self.ts = val 161 super().__init__(nla_type, b"") 162 163 @property 164 def nla_len(self): 165 return NlAttr.HDR_LEN + self.DATA_LEN 166 167 def _print_attr_value(self): 168 return " tv_sec={} tv_nsec={}".format(self.ts.tv_sec, self.ts.tv_nsec) 169 170 @staticmethod 171 def _validate(data): 172 assert len(data) == NlAttr.HDR_LEN + NlAttrTS.DATA_LEN 173 nla_len, nla_type = struct.unpack("@HH", data[:NlAttr.HDR_LEN]) 174 assert nla_len == NlAttr.HDR_LEN + NlAttrTS.DATA_LEN 175 176 @classmethod 177 def _parse(cls, data): 178 nla_len, nla_type = struct.unpack("@HH", data[:NlAttr.HDR_LEN]) 179 val = timespec.from_buffer_copy(data[NlAttr.HDR_LEN:]) 180 return cls(nla_type, val) 181 182 def __bytes__(self): 183 return self._to_bytes(bytes(self.ts)) 184 185 186ktest_info_attrs = prepare_attrs_map( 187 [ 188 AttrDescr(KtestAttrType.KTEST_ATTR_MOD_NAME, NlAttrStr), 189 AttrDescr(KtestAttrType.KTEST_ATTR_TEST_NAME, NlAttrStr), 190 AttrDescr(KtestAttrType.KTEST_ATTR_TEST_DESCR, NlAttrStr), 191 ] 192) 193 194 195ktest_msg_attrs = prepare_attrs_map( 196 [ 197 AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_FUNC, NlAttrStr), 198 AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_FILE, NlAttrStr), 199 AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_LINE, NlAttrU32), 200 AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_TEXT, NlAttrStr), 201 AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_LEVEL, NlAttrU8), 202 AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_TS, NlAttrTS), 203 ] 204) 205 206 207class KtestInfoMessage(NetlinkGenlMessage): 208 messages = [ 209 NlMsgProps(KtestMsgType.KTEST_CMD_LIST, NlMsgCategory.GET), 210 NlMsgProps(KtestMsgType.KTEST_CMD_RUN, NlMsgCategory.NEW), 211 NlMsgProps(KtestMsgType.KTEST_CMD_NEWTEST, NlMsgCategory.NEW), 212 ] 213 nl_attrs_map = ktest_info_attrs 214 family_name = KtestFamilyName 215 216 217class KtestMsgMessage(NetlinkGenlMessage): 218 messages = [ 219 NlMsgProps(KtestMsgType.KTEST_CMD_NEWMESSAGE, NlMsgCategory.NEW), 220 ] 221 nl_attrs_map = ktest_msg_attrs 222 family_name = KtestFamilyName 223 224 225handler_classes = { 226 GenlCtrlFamilyName: [NetlinkGenlCtrlMessage], 227 KtestFamilyName: [KtestInfoMessage, KtestMsgMessage], 228} 229