xref: /freebsd/tests/atf_python/sys/netlink/netlink_generic.py (revision f0ffe1ce0fba262b458399dcd4c5e8c84e024606)
1#!/usr/local/bin/python3
2import struct
3from ctypes import c_int64
4from ctypes import c_long
5from ctypes import sizeof
6from ctypes import Structure
7from enum import Enum
8
9from atf_python.sys.netlink.attrs import NlAttr
10from atf_python.sys.netlink.attrs import NlAttrIp4
11from atf_python.sys.netlink.attrs import NlAttrIp6
12from atf_python.sys.netlink.attrs import NlAttrS32
13from atf_python.sys.netlink.attrs import NlAttrStr
14from atf_python.sys.netlink.attrs import NlAttrU16
15from atf_python.sys.netlink.attrs import NlAttrU32
16from atf_python.sys.netlink.attrs import NlAttrU8
17from atf_python.sys.netlink.base_headers import GenlMsgHdr
18from atf_python.sys.netlink.message import NlMsgCategory
19from atf_python.sys.netlink.message import NlMsgProps
20from atf_python.sys.netlink.message import StdNetlinkMessage
21from atf_python.sys.netlink.utils import AttrDescr
22from atf_python.sys.netlink.utils import enum_or_int
23from atf_python.sys.netlink.utils import prepare_attrs_map
24
25
26class NetlinkGenlMessage(StdNetlinkMessage):
27    messages = []
28    nl_attrs_map = {}
29    family_name = None
30
31    def __init__(self, helper, family_id, cmd=0):
32        super().__init__(helper, family_id)
33        self.base_hdr = GenlMsgHdr(cmd=enum_or_int(cmd))
34
35    def parse_base_header(self, data):
36        if len(data) < sizeof(GenlMsgHdr):
37            raise ValueError("length less than GenlMsgHdr header")
38        ghdr = GenlMsgHdr.from_buffer_copy(data)
39        return (ghdr, sizeof(GenlMsgHdr))
40
41    def _get_msg_type(self):
42        return self.base_hdr.cmd
43
44    def print_nl_header(self, prepend=""):
45        # len=44, type=RTM_DELROUTE, flags=NLM_F_REQUEST|NLM_F_ACK, seq=1641163704, pid=0  # noqa: E501
46        hdr = self.nl_hdr
47        print(
48            "{}len={}, family={}, flags={}(0x{:X}), seq={}, pid={}".format(
49                prepend,
50                hdr.nlmsg_len,
51                self.family_name,
52                self.get_nlm_flags_str(),
53                hdr.nlmsg_flags,
54                hdr.nlmsg_seq,
55                hdr.nlmsg_pid,
56            )
57        )
58
59    def print_base_header(self, hdr, prepend=""):
60        print(
61            "{}cmd={} version={} reserved={}".format(
62                prepend, self.msg_name, hdr.version, hdr.reserved
63            )
64        )
65
66
67GenlCtrlFamilyName = "nlctrl"
68
69
70class GenlCtrlMsgType(Enum):
71    CTRL_CMD_UNSPEC = 0
72    CTRL_CMD_NEWFAMILY = 1
73    CTRL_CMD_DELFAMILY = 2
74    CTRL_CMD_GETFAMILY = 3
75    CTRL_CMD_NEWOPS = 4
76    CTRL_CMD_DELOPS = 5
77    CTRL_CMD_GETOPS = 6
78    CTRL_CMD_NEWMCAST_GRP = 7
79    CTRL_CMD_DELMCAST_GRP = 8
80    CTRL_CMD_GETMCAST_GRP = 9
81    CTRL_CMD_GETPOLICY = 10
82
83
84class GenlCtrlAttrType(Enum):
85    CTRL_ATTR_FAMILY_ID = 1
86    CTRL_ATTR_FAMILY_NAME = 2
87    CTRL_ATTR_VERSION = 3
88    CTRL_ATTR_HDRSIZE = 4
89    CTRL_ATTR_MAXATTR = 5
90    CTRL_ATTR_OPS = 6
91    CTRL_ATTR_MCAST_GROUPS = 7
92    CTRL_ATTR_POLICY = 8
93    CTRL_ATTR_OP_POLICY = 9
94    CTRL_ATTR_OP = 10
95
96
97genl_ctrl_attrs = prepare_attrs_map(
98    [
99        AttrDescr(GenlCtrlAttrType.CTRL_ATTR_FAMILY_ID, NlAttrU16),
100        AttrDescr(GenlCtrlAttrType.CTRL_ATTR_FAMILY_NAME, NlAttrStr),
101        AttrDescr(GenlCtrlAttrType.CTRL_ATTR_VERSION, NlAttrU32),
102        AttrDescr(GenlCtrlAttrType.CTRL_ATTR_HDRSIZE, NlAttrU32),
103        AttrDescr(GenlCtrlAttrType.CTRL_ATTR_MAXATTR, NlAttrU32),
104    ]
105)
106
107
108class NetlinkGenlCtrlMessage(NetlinkGenlMessage):
109    messages = [
110        NlMsgProps(GenlCtrlMsgType.CTRL_CMD_NEWFAMILY, NlMsgCategory.NEW),
111        NlMsgProps(GenlCtrlMsgType.CTRL_CMD_GETFAMILY, NlMsgCategory.GET),
112        NlMsgProps(GenlCtrlMsgType.CTRL_CMD_DELFAMILY, NlMsgCategory.DELETE),
113    ]
114    nl_attrs_map = genl_ctrl_attrs
115    family_name = GenlCtrlFamilyName
116
117
118CarpFamilyName = "carp"
119
120
121class CarpMsgType(Enum):
122    CARP_NL_CMD_UNSPEC = 0
123    CARP_NL_CMD_GET = 1
124    CARP_NL_CMD_SET = 2
125
126
127class CarpAttrType(Enum):
128    CARP_NL_UNSPEC = 0
129    CARP_NL_VHID = 1
130    CARP_NL_STATE = 2
131    CARP_NL_ADVBASE = 3
132    CARP_NL_ADVSKEW = 4
133    CARP_NL_KEY = 5
134    CARP_NL_IFINDEX = 6
135    CARP_NL_ADDR = 7
136    CARP_NL_ADDR6 = 8
137    CARP_NL_IFNAME = 9
138
139
140carp_gen_attrs = prepare_attrs_map(
141    [
142        AttrDescr(CarpAttrType.CARP_NL_VHID, NlAttrU32),
143        AttrDescr(CarpAttrType.CARP_NL_STATE, NlAttrU32),
144        AttrDescr(CarpAttrType.CARP_NL_ADVBASE, NlAttrS32),
145        AttrDescr(CarpAttrType.CARP_NL_ADVSKEW, NlAttrS32),
146        AttrDescr(CarpAttrType.CARP_NL_KEY, NlAttr),
147        AttrDescr(CarpAttrType.CARP_NL_IFINDEX, NlAttrU32),
148        AttrDescr(CarpAttrType.CARP_NL_ADDR, NlAttrIp4),
149        AttrDescr(CarpAttrType.CARP_NL_ADDR6, NlAttrIp6),
150        AttrDescr(CarpAttrType.CARP_NL_IFNAME, NlAttrStr),
151    ]
152)
153
154
155class CarpGenMessage(NetlinkGenlMessage):
156    messages = [
157        NlMsgProps(CarpMsgType.CARP_NL_CMD_GET, NlMsgCategory.GET),
158        NlMsgProps(CarpMsgType.CARP_NL_CMD_SET, NlMsgCategory.NEW),
159    ]
160    nl_attrs_map = carp_gen_attrs
161    family_name = CarpFamilyName
162
163
164KtestFamilyName = "ktest"
165
166
167class KtestMsgType(Enum):
168    KTEST_CMD_UNSPEC = 0
169    KTEST_CMD_LIST = 1
170    KTEST_CMD_RUN = 2
171    KTEST_CMD_NEWTEST = 3
172    KTEST_CMD_NEWMESSAGE = 4
173
174
175class KtestAttrType(Enum):
176    KTEST_ATTR_MOD_NAME = 1
177    KTEST_ATTR_TEST_NAME = 2
178    KTEST_ATTR_TEST_DESCR = 3
179    KTEST_ATTR_TEST_META = 4
180
181
182class KtestLogMsgType(Enum):
183    KTEST_MSG_START = 1
184    KTEST_MSG_END = 2
185    KTEST_MSG_LOG = 3
186    KTEST_MSG_FAIL = 4
187
188
189class KtestMsgAttrType(Enum):
190    KTEST_MSG_ATTR_TS = 1
191    KTEST_MSG_ATTR_FUNC = 2
192    KTEST_MSG_ATTR_FILE = 3
193    KTEST_MSG_ATTR_LINE = 4
194    KTEST_MSG_ATTR_TEXT = 5
195    KTEST_MSG_ATTR_LEVEL = 6
196    KTEST_MSG_ATTR_META = 7
197
198
199class timespec(Structure):
200    _fields_ = [
201        ("tv_sec", c_int64),
202        ("tv_nsec", c_long),
203    ]
204
205
206class NlAttrTS(NlAttr):
207    DATA_LEN = sizeof(timespec)
208
209    def __init__(self, nla_type, val):
210        self.ts = val
211        super().__init__(nla_type, b"")
212
213    @property
214    def nla_len(self):
215        return NlAttr.HDR_LEN + self.DATA_LEN
216
217    def _print_attr_value(self):
218        return " tv_sec={} tv_nsec={}".format(self.ts.tv_sec, self.ts.tv_nsec)
219
220    @staticmethod
221    def _validate(data):
222        assert len(data) == NlAttr.HDR_LEN + NlAttrTS.DATA_LEN
223        nla_len, nla_type = struct.unpack("@HH", data[:NlAttr.HDR_LEN])
224        assert nla_len == NlAttr.HDR_LEN + NlAttrTS.DATA_LEN
225
226    @classmethod
227    def _parse(cls, data):
228        nla_len, nla_type = struct.unpack("@HH", data[:NlAttr.HDR_LEN])
229        val = timespec.from_buffer_copy(data[NlAttr.HDR_LEN:])
230        return cls(nla_type, val)
231
232    def __bytes__(self):
233        return self._to_bytes(bytes(self.ts))
234
235
236ktest_info_attrs = prepare_attrs_map(
237    [
238        AttrDescr(KtestAttrType.KTEST_ATTR_MOD_NAME, NlAttrStr),
239        AttrDescr(KtestAttrType.KTEST_ATTR_TEST_NAME, NlAttrStr),
240        AttrDescr(KtestAttrType.KTEST_ATTR_TEST_DESCR, NlAttrStr),
241    ]
242)
243
244
245ktest_msg_attrs = prepare_attrs_map(
246    [
247        AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_FUNC, NlAttrStr),
248        AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_FILE, NlAttrStr),
249        AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_LINE, NlAttrU32),
250        AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_TEXT, NlAttrStr),
251        AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_LEVEL, NlAttrU8),
252        AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_TS, NlAttrTS),
253    ]
254)
255
256
257class KtestInfoMessage(NetlinkGenlMessage):
258    messages = [
259        NlMsgProps(KtestMsgType.KTEST_CMD_LIST, NlMsgCategory.GET),
260        NlMsgProps(KtestMsgType.KTEST_CMD_RUN, NlMsgCategory.NEW),
261        NlMsgProps(KtestMsgType.KTEST_CMD_NEWTEST, NlMsgCategory.NEW),
262    ]
263    nl_attrs_map = ktest_info_attrs
264    family_name = KtestFamilyName
265
266
267class KtestMsgMessage(NetlinkGenlMessage):
268    messages = [
269        NlMsgProps(KtestMsgType.KTEST_CMD_NEWMESSAGE, NlMsgCategory.NEW),
270    ]
271    nl_attrs_map = ktest_msg_attrs
272    family_name = KtestFamilyName
273
274
275handler_classes = {
276    CarpFamilyName: [CarpGenMessage],
277    GenlCtrlFamilyName: [NetlinkGenlCtrlMessage],
278    KtestFamilyName: [KtestInfoMessage, KtestMsgMessage],
279}
280