1import errno 2import socket 3 4import pytest 5from atf_python.sys.net.netlink import IflattrType 6from atf_python.sys.net.netlink import IflinkInfo 7from atf_python.sys.net.netlink import IfLinkInfoDataVlan 8from atf_python.sys.net.netlink import NetlinkIflaMessage 9from atf_python.sys.net.netlink import NetlinkTestTemplate 10from atf_python.sys.net.netlink import NlAttrNested 11from atf_python.sys.net.netlink import NlAttrStr 12from atf_python.sys.net.netlink import NlAttrStrn 13from atf_python.sys.net.netlink import NlAttrU16 14from atf_python.sys.net.netlink import NlAttrU32 15from atf_python.sys.net.netlink import NlConst 16from atf_python.sys.net.netlink import NlmBaseFlags 17from atf_python.sys.net.netlink import NlmNewFlags 18from atf_python.sys.net.netlink import NlMsgType 19from atf_python.sys.net.netlink import NlRtMsgType 20from atf_python.sys.net.vnet import SingleVnetTestTemplate 21 22 23class TestRtNlIface(SingleVnetTestTemplate, NetlinkTestTemplate): 24 def setup_method(self, method): 25 super().setup_method(method) 26 self.setup_netlink(NlConst.NETLINK_ROUTE) 27 28 def get_interface_byname(self, ifname): 29 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value) 30 msg.nl_hdr.nlmsg_flags = ( 31 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 32 ) 33 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname)) 34 self.write_message(msg) 35 while True: 36 rx_msg = self.read_message() 37 if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq: 38 if rx_msg.is_type(NlMsgType.NLMSG_ERROR): 39 if rx_msg.error_code != 0: 40 raise ValueError("unable to get interface {}".format(ifname)) 41 elif rx_msg.is_type(NlRtMsgType.RTM_NEWLINK): 42 return rx_msg 43 else: 44 raise ValueError("bad message") 45 46 def test_get_iface_byname_error(self): 47 """Tests error on fetching non-existing interface name""" 48 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value) 49 msg.nl_hdr.nlmsg_flags = ( 50 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 51 ) 52 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 53 54 rx_msg = self.get_reply(msg) 55 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 56 assert rx_msg.error_code == errno.ENODEV 57 58 def test_get_iface_byindex_error(self): 59 """Tests error on fetching non-existing interface index""" 60 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value) 61 msg.nl_hdr.nlmsg_flags = ( 62 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 63 ) 64 msg.base_hdr.ifi_index = 2147483647 65 66 rx_msg = self.get_reply(msg) 67 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 68 assert rx_msg.error_code == errno.ENODEV 69 70 @pytest.mark.require_user("root") 71 def test_create_iface_plain(self): 72 """Tests loopback creation w/o any parameters""" 73 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 74 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 75 msg.nl_hdr.nlmsg_flags = ( 76 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 77 ) 78 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 79 msg.add_nla( 80 NlAttrNested( 81 IflattrType.IFLA_LINKINFO, 82 [ 83 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"), 84 ], 85 ) 86 ) 87 88 rx_msg = self.get_reply(msg) 89 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 90 assert rx_msg.error_code == 0 91 92 self.get_interface_byname("lo10") 93 94 @pytest.mark.require_user("root") 95 def test_create_iface_attrs(self): 96 """Tests interface creation with additional properties""" 97 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 98 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 99 msg.nl_hdr.nlmsg_flags = ( 100 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 101 ) 102 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 103 msg.add_nla( 104 NlAttrNested( 105 IflattrType.IFLA_LINKINFO, 106 [ 107 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"), 108 ], 109 ) 110 ) 111 112 # Custom attributes 113 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description")) 114 msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024)) 115 116 rx_msg = self.get_reply(msg) 117 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 118 assert rx_msg.error_code == 0 119 120 iface_msg = self.get_interface_byname("lo10") 121 assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description" 122 assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024 123 124 @pytest.mark.require_user("root") 125 def test_modify_iface_attrs(self): 126 """Tests interface modifications""" 127 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 128 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 129 msg.nl_hdr.nlmsg_flags = ( 130 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 131 ) 132 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 133 msg.add_nla( 134 NlAttrNested( 135 IflattrType.IFLA_LINKINFO, 136 [ 137 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"), 138 ], 139 ) 140 ) 141 142 rx_msg = self.get_reply(msg) 143 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 144 assert rx_msg.error_code == 0 145 146 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 147 msg.nl_hdr.nlmsg_flags = ( 148 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 149 ) 150 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 151 152 # Custom attributes 153 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description")) 154 msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024)) 155 156 rx_msg = self.get_reply(msg) 157 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 158 assert rx_msg.error_code == 0 159 160 iface_msg = self.get_interface_byname("lo10") 161 assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description" 162 assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024 163 164 @pytest.mark.require_user("root") 165 def test_delete_iface(self): 166 """Tests interface modifications""" 167 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 168 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 169 msg.nl_hdr.nlmsg_flags = ( 170 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 171 ) 172 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 173 msg.add_nla( 174 NlAttrNested( 175 IflattrType.IFLA_LINKINFO, 176 [ 177 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"), 178 ], 179 ) 180 ) 181 182 rx_msg = self.get_reply(msg) 183 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 184 assert rx_msg.error_code == 0 185 186 iface_msg = self.get_interface_byname("lo10") 187 iface_idx = iface_msg.base_hdr.ifi_index 188 189 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_DELLINK.value) 190 msg.nl_hdr.nlmsg_flags = ( 191 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 192 ) 193 msg.base_hdr.ifi_index = iface_idx 194 # msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 195 196 rx_msg = self.get_reply(msg) 197 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 198 assert rx_msg.error_code == 0 199 200 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value) 201 msg.nl_hdr.nlmsg_flags = ( 202 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 203 ) 204 msg.base_hdr.ifi_index = 2147483647 205 206 rx_msg = self.get_reply(msg) 207 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 208 assert rx_msg.error_code == errno.ENODEV 209 210 # 211 # * 212 # * {len=76, type=RTM_NEWLINK, flags=NLM_F_REQUEST|NLM_F_ACK|NLM_F_EXCL|NLM_F_CREATE, seq=1662892737, pid=0}, 213 # * {ifi_family=AF_UNSPEC, ifi_type=ARPHRD_NETROM, ifi_index=0, ifi_flags=0, ifi_change=0}, 214 # * {{nla_len=8, nla_type=IFLA_LINK}, 2}, 215 # * {{nla_len=12, nla_type=IFLA_IFNAME}, "xvlan22"}, 216 # * {{nla_len=24, nla_type=IFLA_LINKINFO}, 217 # * {{nla_len=8, nla_type=IFLA_INFO_KIND}, "vlan"...}, 218 # * {{nla_len=12, nla_type=IFLA_INFO_DATA}, "\x06\x00\x01\x00\x16\x00\x00\x00"} 219 # */ 220 @pytest.mark.skip(reason="vlan support needs more work") 221 @pytest.mark.require_user("root") 222 def test_create_vlan_plain(self): 223 """Creates 802.1Q VLAN interface in vlanXX and ifX fashion""" 224 os_ifname = self.vnet.iface_alias_map["if1"].name 225 ifindex = socket.if_nametoindex(os_ifname) 226 227 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 228 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 229 msg.nl_hdr.nlmsg_flags = ( 230 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 231 ) 232 233 msg.add_nla(NlAttrU32(IflattrType.IFLA_LINK, ifindex)) 234 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "vlan22")) 235 236 msg.add_nla( 237 NlAttrNested( 238 IflattrType.IFLA_LINKINFO, 239 [ 240 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "vlan"), 241 NlAttrNested( 242 IflinkInfo.IFLA_INFO_DATA, 243 [ 244 NlAttrU16(IfLinkInfoDataVlan.IFLA_VLAN_ID, 22), 245 ], 246 ), 247 ], 248 ) 249 ) 250 251 rx_msg = self.get_reply(msg) 252 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 253 assert rx_msg.error_code == 0 254 255 self.get_interface_byname("vlan22") 256 # ToolsHelper.print_net_debug() 257