xref: /freebsd/tests/atf_python/sys/netlink/message.py (revision 04a036601e10237ae00655e515aeb78762eb5d1a)
1fee65b7eSAlexander V. Chernikov#!/usr/local/bin/python3
2fee65b7eSAlexander V. Chernikovimport struct
3fee65b7eSAlexander V. Chernikovfrom ctypes import sizeof
4fc2538cbSAlexander V. Chernikovfrom enum import Enum
5fee65b7eSAlexander V. Chernikovfrom typing import List
6fc2538cbSAlexander V. Chernikovfrom typing import NamedTuple
7fee65b7eSAlexander V. Chernikov
8fee65b7eSAlexander V. Chernikovfrom atf_python.sys.netlink.attrs import NlAttr
9fee65b7eSAlexander V. Chernikovfrom atf_python.sys.netlink.attrs import NlAttrNested
10fc2538cbSAlexander V. Chernikovfrom atf_python.sys.netlink.base_headers import NlmAckFlags
11fc2538cbSAlexander V. Chernikovfrom atf_python.sys.netlink.base_headers import NlmNewFlags
12fc2538cbSAlexander V. Chernikovfrom atf_python.sys.netlink.base_headers import NlmGetFlags
13fc2538cbSAlexander V. Chernikovfrom atf_python.sys.netlink.base_headers import NlmDeleteFlags
14fee65b7eSAlexander V. Chernikovfrom atf_python.sys.netlink.base_headers import NlmBaseFlags
15fee65b7eSAlexander V. Chernikovfrom atf_python.sys.netlink.base_headers import Nlmsghdr
16fee65b7eSAlexander V. Chernikovfrom atf_python.sys.netlink.base_headers import NlMsgType
17fee65b7eSAlexander V. Chernikovfrom atf_python.sys.netlink.utils import align4
18fee65b7eSAlexander V. Chernikovfrom atf_python.sys.netlink.utils import enum_or_int
19fc2538cbSAlexander V. Chernikovfrom atf_python.sys.netlink.utils import get_bitmask_str
20fc2538cbSAlexander V. Chernikov
21fc2538cbSAlexander V. Chernikov
22fc2538cbSAlexander V. Chernikovclass NlMsgCategory(Enum):
23fc2538cbSAlexander V. Chernikov    UNKNOWN = 0
24fc2538cbSAlexander V. Chernikov    GET = 1
25fc2538cbSAlexander V. Chernikov    NEW = 2
26fc2538cbSAlexander V. Chernikov    DELETE = 3
27fc2538cbSAlexander V. Chernikov    ACK = 4
28fc2538cbSAlexander V. Chernikov
29fc2538cbSAlexander V. Chernikov
30fc2538cbSAlexander V. Chernikovclass NlMsgProps(NamedTuple):
31fc2538cbSAlexander V. Chernikov    msg: Enum
32fc2538cbSAlexander V. Chernikov    category: NlMsgCategory
33fee65b7eSAlexander V. Chernikov
34fee65b7eSAlexander V. Chernikov
35fee65b7eSAlexander V. Chernikovclass BaseNetlinkMessage(object):
36fee65b7eSAlexander V. Chernikov    def __init__(self, helper, nlmsg_type):
37fee65b7eSAlexander V. Chernikov        self.nlmsg_type = enum_or_int(nlmsg_type)
38fee65b7eSAlexander V. Chernikov        self.nla_list = []
39fee65b7eSAlexander V. Chernikov        self._orig_data = None
40fee65b7eSAlexander V. Chernikov        self.helper = helper
41fee65b7eSAlexander V. Chernikov        self.nl_hdr = Nlmsghdr(
42fee65b7eSAlexander V. Chernikov            nlmsg_type=self.nlmsg_type, nlmsg_seq=helper.get_seq(), nlmsg_pid=helper.pid
43fee65b7eSAlexander V. Chernikov        )
44fee65b7eSAlexander V. Chernikov        self.base_hdr = None
45fee65b7eSAlexander V. Chernikov
46fee65b7eSAlexander V. Chernikov    def set_request(self, need_ack=True):
47fee65b7eSAlexander V. Chernikov        self.add_nlflags([NlmBaseFlags.NLM_F_REQUEST])
48fee65b7eSAlexander V. Chernikov        if need_ack:
49fee65b7eSAlexander V. Chernikov            self.add_nlflags([NlmBaseFlags.NLM_F_ACK])
50fee65b7eSAlexander V. Chernikov
51fee65b7eSAlexander V. Chernikov    def add_nlflags(self, flags: List):
52fee65b7eSAlexander V. Chernikov        int_flags = 0
53fee65b7eSAlexander V. Chernikov        for flag in flags:
54fee65b7eSAlexander V. Chernikov            int_flags |= enum_or_int(flag)
55fee65b7eSAlexander V. Chernikov        self.nl_hdr.nlmsg_flags |= int_flags
56fee65b7eSAlexander V. Chernikov
57fee65b7eSAlexander V. Chernikov    def add_nla(self, nla):
58fee65b7eSAlexander V. Chernikov        self.nla_list.append(nla)
59fee65b7eSAlexander V. Chernikov
60fee65b7eSAlexander V. Chernikov    def _get_nla(self, nla_list, nla_type):
61fee65b7eSAlexander V. Chernikov        nla_type_raw = enum_or_int(nla_type)
62fee65b7eSAlexander V. Chernikov        for nla in nla_list:
63fee65b7eSAlexander V. Chernikov            if nla.nla_type == nla_type_raw:
64fee65b7eSAlexander V. Chernikov                return nla
65fee65b7eSAlexander V. Chernikov        return None
66fee65b7eSAlexander V. Chernikov
67fee65b7eSAlexander V. Chernikov    def get_nla(self, nla_type):
68fee65b7eSAlexander V. Chernikov        return self._get_nla(self.nla_list, nla_type)
69fee65b7eSAlexander V. Chernikov
70fee65b7eSAlexander V. Chernikov    @staticmethod
71fee65b7eSAlexander V. Chernikov    def parse_nl_header(data: bytes):
72fee65b7eSAlexander V. Chernikov        if len(data) < sizeof(Nlmsghdr):
73fee65b7eSAlexander V. Chernikov            raise ValueError("length less than netlink message header")
74fee65b7eSAlexander V. Chernikov        return Nlmsghdr.from_buffer_copy(data), sizeof(Nlmsghdr)
75fee65b7eSAlexander V. Chernikov
76fee65b7eSAlexander V. Chernikov    def is_type(self, nlmsg_type):
77fee65b7eSAlexander V. Chernikov        nlmsg_type_raw = enum_or_int(nlmsg_type)
78fee65b7eSAlexander V. Chernikov        return nlmsg_type_raw == self.nl_hdr.nlmsg_type
79fee65b7eSAlexander V. Chernikov
80fee65b7eSAlexander V. Chernikov    def is_reply(self, hdr):
81fee65b7eSAlexander V. Chernikov        return hdr.nlmsg_type == NlMsgType.NLMSG_ERROR.value
82fee65b7eSAlexander V. Chernikov
83fc2538cbSAlexander V. Chernikov    @property
84fc2538cbSAlexander V. Chernikov    def msg_name(self):
85fc2538cbSAlexander V. Chernikov        return "msg#{}".format(self._get_msg_type())
86fc2538cbSAlexander V. Chernikov
87fc2538cbSAlexander V. Chernikov    def _get_nl_category(self):
88fc2538cbSAlexander V. Chernikov        if self.is_reply(self.nl_hdr):
89fc2538cbSAlexander V. Chernikov            return NlMsgCategory.ACK
90fc2538cbSAlexander V. Chernikov        return NlMsgCategory.UNKNOWN
91fc2538cbSAlexander V. Chernikov
92fc2538cbSAlexander V. Chernikov    def get_nlm_flags_str(self):
93fc2538cbSAlexander V. Chernikov        category = self._get_nl_category()
94fc2538cbSAlexander V. Chernikov        flags = self.nl_hdr.nlmsg_flags
95fc2538cbSAlexander V. Chernikov
96fc2538cbSAlexander V. Chernikov        if category == NlMsgCategory.UNKNOWN:
97fc2538cbSAlexander V. Chernikov            return self.helper.get_bitmask_str(NlmBaseFlags, flags)
98fc2538cbSAlexander V. Chernikov        elif category == NlMsgCategory.GET:
99fc2538cbSAlexander V. Chernikov            flags_enum = NlmGetFlags
100fc2538cbSAlexander V. Chernikov        elif category == NlMsgCategory.NEW:
101fc2538cbSAlexander V. Chernikov            flags_enum = NlmNewFlags
102fc2538cbSAlexander V. Chernikov        elif category == NlMsgCategory.DELETE:
103fc2538cbSAlexander V. Chernikov            flags_enum = NlmDeleteFlags
104fc2538cbSAlexander V. Chernikov        elif category == NlMsgCategory.ACK:
105fc2538cbSAlexander V. Chernikov            flags_enum = NlmAckFlags
106fc2538cbSAlexander V. Chernikov        return get_bitmask_str([NlmBaseFlags, flags_enum], flags)
107fc2538cbSAlexander V. Chernikov
108fc2538cbSAlexander V. Chernikov    def print_nl_header(self, prepend=""):
109fee65b7eSAlexander V. Chernikov        # len=44, type=RTM_DELROUTE, flags=NLM_F_REQUEST|NLM_F_ACK, seq=1641163704, pid=0  # noqa: E501
110fc2538cbSAlexander V. Chernikov        hdr = self.nl_hdr
111fee65b7eSAlexander V. Chernikov        print(
112fee65b7eSAlexander V. Chernikov            "{}len={}, type={}, flags={}(0x{:X}), seq={}, pid={}".format(
113fee65b7eSAlexander V. Chernikov                prepend,
114fee65b7eSAlexander V. Chernikov                hdr.nlmsg_len,
115fc2538cbSAlexander V. Chernikov                self.msg_name,
116fc2538cbSAlexander V. Chernikov                self.get_nlm_flags_str(),
117fee65b7eSAlexander V. Chernikov                hdr.nlmsg_flags,
118fee65b7eSAlexander V. Chernikov                hdr.nlmsg_seq,
119fee65b7eSAlexander V. Chernikov                hdr.nlmsg_pid,
120fee65b7eSAlexander V. Chernikov            )
121fee65b7eSAlexander V. Chernikov        )
122fee65b7eSAlexander V. Chernikov
123fee65b7eSAlexander V. Chernikov    @classmethod
124fee65b7eSAlexander V. Chernikov    def from_bytes(cls, helper, data):
125fee65b7eSAlexander V. Chernikov        try:
126fee65b7eSAlexander V. Chernikov            hdr, hdrlen = BaseNetlinkMessage.parse_nl_header(data)
127fee65b7eSAlexander V. Chernikov            self = cls(helper, hdr.nlmsg_type)
128fee65b7eSAlexander V. Chernikov            self._orig_data = data
129fee65b7eSAlexander V. Chernikov            self.nl_hdr = hdr
130fee65b7eSAlexander V. Chernikov        except ValueError as e:
131fee65b7eSAlexander V. Chernikov            print("Failed to parse nl header: {}".format(e))
132fee65b7eSAlexander V. Chernikov            cls.print_as_bytes(data)
133fee65b7eSAlexander V. Chernikov            raise
134fee65b7eSAlexander V. Chernikov        return self
135fee65b7eSAlexander V. Chernikov
136fee65b7eSAlexander V. Chernikov    def print_message(self):
137fc2538cbSAlexander V. Chernikov        self.print_nl_header()
138fee65b7eSAlexander V. Chernikov
139fee65b7eSAlexander V. Chernikov    @staticmethod
140fee65b7eSAlexander V. Chernikov    def print_as_bytes(data: bytes, descr: str):
141fee65b7eSAlexander V. Chernikov        print("===vv {} (len:{:3d}) vv===".format(descr, len(data)))
142fee65b7eSAlexander V. Chernikov        off = 0
143fee65b7eSAlexander V. Chernikov        step = 16
144fee65b7eSAlexander V. Chernikov        while off < len(data):
145fee65b7eSAlexander V. Chernikov            for i in range(step):
146fee65b7eSAlexander V. Chernikov                if off + i < len(data):
147fee65b7eSAlexander V. Chernikov                    print(" {:02X}".format(data[off + i]), end="")
148fee65b7eSAlexander V. Chernikov            print("")
149fee65b7eSAlexander V. Chernikov            off += step
150fee65b7eSAlexander V. Chernikov        print("--------------------")
151fee65b7eSAlexander V. Chernikov
152fee65b7eSAlexander V. Chernikov
153fee65b7eSAlexander V. Chernikovclass StdNetlinkMessage(BaseNetlinkMessage):
154fee65b7eSAlexander V. Chernikov    nl_attrs_map = {}
155fee65b7eSAlexander V. Chernikov
156fee65b7eSAlexander V. Chernikov    @classmethod
157fee65b7eSAlexander V. Chernikov    def from_bytes(cls, helper, data):
158fee65b7eSAlexander V. Chernikov        try:
159fee65b7eSAlexander V. Chernikov            hdr, hdrlen = BaseNetlinkMessage.parse_nl_header(data)
160fee65b7eSAlexander V. Chernikov            self = cls(helper, hdr.nlmsg_type)
161fee65b7eSAlexander V. Chernikov            self._orig_data = data
162fee65b7eSAlexander V. Chernikov            self.nl_hdr = hdr
163fee65b7eSAlexander V. Chernikov        except ValueError as e:
164fee65b7eSAlexander V. Chernikov            print("Failed to parse nl header: {}".format(e))
165fee65b7eSAlexander V. Chernikov            cls.print_as_bytes(data)
166fee65b7eSAlexander V. Chernikov            raise
167fee65b7eSAlexander V. Chernikov
168fee65b7eSAlexander V. Chernikov        offset = align4(hdrlen)
169fee65b7eSAlexander V. Chernikov        try:
170fee65b7eSAlexander V. Chernikov            base_hdr, hdrlen = self.parse_base_header(data[offset:])
171fee65b7eSAlexander V. Chernikov            self.base_hdr = base_hdr
172fee65b7eSAlexander V. Chernikov            offset += align4(hdrlen)
173fee65b7eSAlexander V. Chernikov            # XXX: CAP_ACK
174fee65b7eSAlexander V. Chernikov        except ValueError as e:
175fee65b7eSAlexander V. Chernikov            print("Failed to parse nl rt header: {}".format(e))
176fee65b7eSAlexander V. Chernikov            cls.print_as_bytes(data)
177fee65b7eSAlexander V. Chernikov            raise
178fee65b7eSAlexander V. Chernikov
179fee65b7eSAlexander V. Chernikov        orig_offset = offset
180fee65b7eSAlexander V. Chernikov        try:
181fee65b7eSAlexander V. Chernikov            nla_list, nla_len = self.parse_nla_list(data[offset:])
182fee65b7eSAlexander V. Chernikov            offset += nla_len
183fee65b7eSAlexander V. Chernikov            if offset != len(data):
184fee65b7eSAlexander V. Chernikov                raise ValueError(
185fee65b7eSAlexander V. Chernikov                    "{} bytes left at the end of the packet".format(len(data) - offset)
186fee65b7eSAlexander V. Chernikov                )  # noqa: E501
187fee65b7eSAlexander V. Chernikov            self.nla_list = nla_list
188fee65b7eSAlexander V. Chernikov        except ValueError as e:
189fee65b7eSAlexander V. Chernikov            print(
190fee65b7eSAlexander V. Chernikov                "Failed to parse nla attributes at offset {}: {}".format(orig_offset, e)
191fee65b7eSAlexander V. Chernikov            )  # noqa: E501
192fee65b7eSAlexander V. Chernikov            cls.print_as_bytes(data, "msg dump")
193fee65b7eSAlexander V. Chernikov            cls.print_as_bytes(data[orig_offset:], "failed block")
194fee65b7eSAlexander V. Chernikov            raise
195fee65b7eSAlexander V. Chernikov        return self
196fee65b7eSAlexander V. Chernikov
197fee65b7eSAlexander V. Chernikov    def parse_attrs(self, data: bytes, attr_map):
198fee65b7eSAlexander V. Chernikov        ret = []
199fee65b7eSAlexander V. Chernikov        off = 0
200fee65b7eSAlexander V. Chernikov        while len(data) - off >= 4:
201fee65b7eSAlexander V. Chernikov            nla_len, raw_nla_type = struct.unpack("@HH", data[off:off + 4])
202fee65b7eSAlexander V. Chernikov            if nla_len + off > len(data):
203fee65b7eSAlexander V. Chernikov                raise ValueError(
204fee65b7eSAlexander V. Chernikov                    "attr length {} > than the remaining length {}".format(
205fee65b7eSAlexander V. Chernikov                        nla_len, len(data) - off
206fee65b7eSAlexander V. Chernikov                    )
207fee65b7eSAlexander V. Chernikov                )
208*04a03660SAlexander V. Chernikov            nla_type = raw_nla_type & 0x3FFF
209fee65b7eSAlexander V. Chernikov            if nla_type in attr_map:
210fee65b7eSAlexander V. Chernikov                v = attr_map[nla_type]
211fee65b7eSAlexander V. Chernikov                val = v["ad"].cls.from_bytes(data[off:off + nla_len], v["ad"].val)
212fee65b7eSAlexander V. Chernikov                if "child" in v:
213fee65b7eSAlexander V. Chernikov                    # nested
214fee65b7eSAlexander V. Chernikov                    attrs, _ = self.parse_attrs(
215fee65b7eSAlexander V. Chernikov                        data[off + 4:off + nla_len], v["child"]
216fee65b7eSAlexander V. Chernikov                    )
217fee65b7eSAlexander V. Chernikov                    val = NlAttrNested(v["ad"].val, attrs)
218fee65b7eSAlexander V. Chernikov            else:
219fee65b7eSAlexander V. Chernikov                # unknown attribute
220fee65b7eSAlexander V. Chernikov                val = NlAttr(raw_nla_type, data[off + 4:off + nla_len])
221fee65b7eSAlexander V. Chernikov            ret.append(val)
222fee65b7eSAlexander V. Chernikov            off += align4(nla_len)
223fee65b7eSAlexander V. Chernikov        return ret, off
224fee65b7eSAlexander V. Chernikov
225fee65b7eSAlexander V. Chernikov    def parse_nla_list(self, data: bytes) -> List[NlAttr]:
226fee65b7eSAlexander V. Chernikov        return self.parse_attrs(data, self.nl_attrs_map)
227fee65b7eSAlexander V. Chernikov
228fee65b7eSAlexander V. Chernikov    def __bytes__(self):
229fee65b7eSAlexander V. Chernikov        ret = bytes()
230fee65b7eSAlexander V. Chernikov        for nla in self.nla_list:
231fee65b7eSAlexander V. Chernikov            ret += bytes(nla)
232fee65b7eSAlexander V. Chernikov        ret = bytes(self.base_hdr) + ret
233fee65b7eSAlexander V. Chernikov        self.nl_hdr.nlmsg_len = len(ret) + sizeof(Nlmsghdr)
234fee65b7eSAlexander V. Chernikov        return bytes(self.nl_hdr) + ret
235fee65b7eSAlexander V. Chernikov
236fc2538cbSAlexander V. Chernikov    def _get_msg_type(self):
237fc2538cbSAlexander V. Chernikov        return self.nl_hdr.nlmsg_type
238fc2538cbSAlexander V. Chernikov
239fc2538cbSAlexander V. Chernikov    @property
240fc2538cbSAlexander V. Chernikov    def msg_props(self):
241fc2538cbSAlexander V. Chernikov        msg_type = self._get_msg_type()
242fc2538cbSAlexander V. Chernikov        for msg_props in self.messages:
243fc2538cbSAlexander V. Chernikov            if msg_props.msg.value == msg_type:
244fc2538cbSAlexander V. Chernikov                return msg_props
245fc2538cbSAlexander V. Chernikov        return None
246fc2538cbSAlexander V. Chernikov
247fc2538cbSAlexander V. Chernikov    @property
248fc2538cbSAlexander V. Chernikov    def msg_name(self):
249fc2538cbSAlexander V. Chernikov        msg_props = self.msg_props
250fc2538cbSAlexander V. Chernikov        if msg_props is not None:
251fc2538cbSAlexander V. Chernikov            return msg_props.msg.name
252fc2538cbSAlexander V. Chernikov        return super().msg_name
253fc2538cbSAlexander V. Chernikov
254fee65b7eSAlexander V. Chernikov    def print_base_header(self, hdr, prepend=""):
255fee65b7eSAlexander V. Chernikov        pass
256fee65b7eSAlexander V. Chernikov
257fee65b7eSAlexander V. Chernikov    def print_message(self):
258fc2538cbSAlexander V. Chernikov        self.print_nl_header()
259fee65b7eSAlexander V. Chernikov        self.print_base_header(self.base_hdr, " ")
260fee65b7eSAlexander V. Chernikov        for nla in self.nla_list:
261fee65b7eSAlexander V. Chernikov            nla.print_attr("  ")
262