xref: /freebsd/tests/sys/netlink/test_rtnl_iface.py (revision 3873bdc2f28f6aab6b426ccb6c85ab2a41483264)
1import errno
2import socket
3
4import pytest
5from atf_python.sys.net.netlink import IflattrType
6from atf_python.sys.net.netlink import IflinkInfo
7from atf_python.sys.net.netlink import IfLinkInfoDataVlan
8from atf_python.sys.net.netlink import NetlinkIflaMessage
9from atf_python.sys.net.netlink import NlAttrNested
10from atf_python.sys.net.netlink import NlAttrStr
11from atf_python.sys.net.netlink import NlAttrStrn
12from atf_python.sys.net.netlink import NlAttrU16
13from atf_python.sys.net.netlink import NlAttrU32
14from atf_python.sys.net.netlink import NlConst
15from atf_python.sys.net.netlink import NlHelper
16from atf_python.sys.net.netlink import NlmBaseFlags
17from atf_python.sys.net.netlink import NlmNewFlags
18from atf_python.sys.net.netlink import NlMsgType
19from atf_python.sys.net.netlink import NlRtMsgType
20from atf_python.sys.net.netlink import Nlsock
21from atf_python.sys.net.vnet import SingleVnetTestTemplate
22
23
24class TestRtNlIface(SingleVnetTestTemplate):
25    REQUIRED_MODULES = ["netlink"]
26
27    def setup_method(self, method):
28        super().setup_method(method)
29        self.helper = NlHelper()
30        self.nlsock = Nlsock(NlConst.NETLINK_ROUTE, self.helper)
31
32    def write_message(self, msg):
33        print("")
34        print("============= >> TX MESSAGE =============")
35        msg.print_message()
36        self.nlsock.write_data(bytes(msg))
37        msg.print_as_bytes(bytes(msg), "-- DATA --")
38
39    def read_message(self):
40        msg = self.nlsock.read_message()
41        print("")
42        print("============= << RX MESSAGE =============")
43        msg.print_message()
44        return msg
45
46    def get_reply(self, tx_msg):
47        self.write_message(tx_msg)
48        while True:
49            rx_msg = self.read_message()
50            if tx_msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq:
51                return rx_msg
52
53    def get_interface_byname(self, ifname):
54        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
55        msg.nl_hdr.nlmsg_flags = (
56            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
57        )
58        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname))
59        self.write_message(msg)
60        while True:
61            rx_msg = self.read_message()
62            if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq:
63                if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
64                    if rx_msg.error_code != 0:
65                        raise ValueError("unable to get interface {}".format(ifname))
66                elif rx_msg.is_type(NlRtMsgType.RTM_NEWLINK):
67                    return rx_msg
68                else:
69                    raise ValueError("bad message")
70
71    def test_get_iface_byname_error(self):
72        """Tests error on fetching non-existing interface name"""
73        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
74        msg.nl_hdr.nlmsg_flags = (
75            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
76        )
77        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
78
79        rx_msg = self.get_reply(msg)
80        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
81        assert rx_msg.error_code == errno.ENODEV
82
83    def test_get_iface_byindex_error(self):
84        """Tests error on fetching non-existing interface index"""
85        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
86        msg.nl_hdr.nlmsg_flags = (
87            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
88        )
89        msg.base_hdr.ifi_index = 2147483647
90
91        rx_msg = self.get_reply(msg)
92        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
93        assert rx_msg.error_code == errno.ENODEV
94
95    @pytest.mark.require_user("root")
96    def test_create_iface_plain(self):
97        """Tests loopback creation w/o any parameters"""
98        flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
99        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
100        msg.nl_hdr.nlmsg_flags = (
101            flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
102        )
103        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
104        msg.add_nla(
105            NlAttrNested(
106                IflattrType.IFLA_LINKINFO,
107                [
108                    NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
109                ],
110            )
111        )
112
113        rx_msg = self.get_reply(msg)
114        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
115        assert rx_msg.error_code == 0
116
117        self.get_interface_byname("lo10")
118
119    @pytest.mark.require_user("root")
120    def test_create_iface_attrs(self):
121        """Tests interface creation with additional properties"""
122        flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
123        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
124        msg.nl_hdr.nlmsg_flags = (
125            flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
126        )
127        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
128        msg.add_nla(
129            NlAttrNested(
130                IflattrType.IFLA_LINKINFO,
131                [
132                    NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
133                ],
134            )
135        )
136
137        # Custom attributes
138        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description"))
139        msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024))
140
141        rx_msg = self.get_reply(msg)
142        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
143        assert rx_msg.error_code == 0
144
145        iface_msg = self.get_interface_byname("lo10")
146        assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description"
147        assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024
148
149    @pytest.mark.require_user("root")
150    def test_modify_iface_attrs(self):
151        """Tests interface modifications"""
152        flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
153        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
154        msg.nl_hdr.nlmsg_flags = (
155            flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
156        )
157        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
158        msg.add_nla(
159            NlAttrNested(
160                IflattrType.IFLA_LINKINFO,
161                [
162                    NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
163                ],
164            )
165        )
166
167        rx_msg = self.get_reply(msg)
168        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
169        assert rx_msg.error_code == 0
170
171        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
172        msg.nl_hdr.nlmsg_flags = (
173            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
174        )
175        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
176
177        # Custom attributes
178        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description"))
179        msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024))
180
181        rx_msg = self.get_reply(msg)
182        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
183        assert rx_msg.error_code == 0
184
185        iface_msg = self.get_interface_byname("lo10")
186        assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description"
187        assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024
188
189    @pytest.mark.require_user("root")
190    def test_delete_iface(self):
191        """Tests interface modifications"""
192        flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
193        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
194        msg.nl_hdr.nlmsg_flags = (
195            flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
196        )
197        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
198        msg.add_nla(
199            NlAttrNested(
200                IflattrType.IFLA_LINKINFO,
201                [
202                    NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
203                ],
204            )
205        )
206
207        rx_msg = self.get_reply(msg)
208        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
209        assert rx_msg.error_code == 0
210
211        iface_msg = self.get_interface_byname("lo10")
212        iface_idx = iface_msg.base_hdr.ifi_index
213
214        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_DELLINK.value)
215        msg.nl_hdr.nlmsg_flags = (
216            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
217        )
218        msg.base_hdr.ifi_index = iface_idx
219        # msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
220
221        rx_msg = self.get_reply(msg)
222        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
223        assert rx_msg.error_code == 0
224
225        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
226        msg.nl_hdr.nlmsg_flags = (
227            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
228        )
229        msg.base_hdr.ifi_index = 2147483647
230
231        rx_msg = self.get_reply(msg)
232        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
233        assert rx_msg.error_code == errno.ENODEV
234
235    #
236    # *
237    # * {len=76, type=RTM_NEWLINK, flags=NLM_F_REQUEST|NLM_F_ACK|NLM_F_EXCL|NLM_F_CREATE, seq=1662892737, pid=0},
238    # *  {ifi_family=AF_UNSPEC, ifi_type=ARPHRD_NETROM, ifi_index=0, ifi_flags=0, ifi_change=0},
239    # *   [
240    # *    {{nla_len=8, nla_type=IFLA_LINK}, 2},
241    # *    {{nla_len=12, nla_type=IFLA_IFNAME}, "xvlan22"},
242    # *    {{nla_len=24, nla_type=IFLA_LINKINFO},
243    # *     [
244    # *      {{nla_len=8, nla_type=IFLA_INFO_KIND}, "vlan"...},
245    # *      {{nla_len=12, nla_type=IFLA_INFO_DATA}, "\x06\x00\x01\x00\x16\x00\x00\x00"}]}]}, iov_len=76}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 76
246    # */
247    @pytest.mark.skip(reason="vlan support needs more work")
248    @pytest.mark.require_user("root")
249    def test_create_vlan_plain(self):
250        """Creates 802.1Q VLAN interface in vlanXX and ifX fashion"""
251        os_ifname = self.vnet.iface_alias_map["if1"].name
252        ifindex = socket.if_nametoindex(os_ifname)
253
254        flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
255        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
256        msg.nl_hdr.nlmsg_flags = (
257            flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
258        )
259
260        msg.add_nla(NlAttrU32(IflattrType.IFLA_LINK, ifindex))
261        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "vlan22"))
262
263        msg.add_nla(
264            NlAttrNested(
265                IflattrType.IFLA_LINKINFO,
266                [
267                    NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "vlan"),
268                    NlAttrNested(
269                        IflinkInfo.IFLA_INFO_DATA,
270                        [
271                            NlAttrU16(IfLinkInfoDataVlan.IFLA_VLAN_ID, 22),
272                        ],
273                    ),
274                ],
275            )
276        )
277
278        rx_msg = self.get_reply(msg)
279        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
280        assert rx_msg.error_code == 0
281
282        self.get_interface_byname("vlan22")
283        # ToolsHelper.print_net_debug()
284