1import errno 2import socket 3 4import pytest 5from atf_python.sys.net.netlink import IflattrType 6from atf_python.sys.net.netlink import IflinkInfo 7from atf_python.sys.net.netlink import IfLinkInfoDataVlan 8from atf_python.sys.net.netlink import NetlinkIflaMessage 9from atf_python.sys.net.netlink import NetlinkTestTemplate 10from atf_python.sys.net.netlink import NlAttrNested 11from atf_python.sys.net.netlink import NlAttrStr 12from atf_python.sys.net.netlink import NlAttrStrn 13from atf_python.sys.net.netlink import NlAttrU16 14from atf_python.sys.net.netlink import NlAttrU32 15from atf_python.sys.net.netlink import NlConst 16from atf_python.sys.net.netlink import NlmBaseFlags 17from atf_python.sys.net.netlink import NlmNewFlags 18from atf_python.sys.net.netlink import NlMsgType 19from atf_python.sys.net.netlink import NlRtMsgType 20from atf_python.sys.net.netlink import rtnl_ifla_attrs 21from atf_python.sys.net.vnet import SingleVnetTestTemplate 22 23 24class TestRtNlIface(NetlinkTestTemplate, SingleVnetTestTemplate): 25 def setup_method(self, method): 26 super().setup_method(method) 27 self.setup_netlink(NlConst.NETLINK_ROUTE) 28 29 def get_interface_byname(self, ifname): 30 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value) 31 msg.nl_hdr.nlmsg_flags = ( 32 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 33 ) 34 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname)) 35 self.write_message(msg) 36 while True: 37 rx_msg = self.read_message() 38 if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq: 39 if rx_msg.is_type(NlMsgType.NLMSG_ERROR): 40 if rx_msg.error_code != 0: 41 raise ValueError("unable to get interface {}".format(ifname)) 42 elif rx_msg.is_type(NlRtMsgType.RTM_NEWLINK): 43 return rx_msg 44 else: 45 raise ValueError("bad message") 46 47 def test_get_iface_byname_error(self): 48 """Tests error on fetching non-existing interface name""" 49 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value) 50 msg.nl_hdr.nlmsg_flags = ( 51 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 52 ) 53 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 54 55 rx_msg = self.get_reply(msg) 56 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 57 assert rx_msg.error_code == errno.ENODEV 58 59 def test_get_iface_byindex_error(self): 60 """Tests error on fetching non-existing interface index""" 61 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value) 62 msg.nl_hdr.nlmsg_flags = ( 63 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 64 ) 65 msg.base_hdr.ifi_index = 2147483647 66 67 rx_msg = self.get_reply(msg) 68 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 69 assert rx_msg.error_code == errno.ENODEV 70 71 @pytest.mark.require_user("root") 72 def test_create_iface_plain(self): 73 """Tests loopback creation w/o any parameters""" 74 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 75 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 76 msg.nl_hdr.nlmsg_flags = ( 77 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 78 ) 79 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 80 msg.add_nla( 81 NlAttrNested( 82 IflattrType.IFLA_LINKINFO, 83 [ 84 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"), 85 ], 86 ) 87 ) 88 89 rx_msg = self.get_reply(msg) 90 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 91 assert rx_msg.error_code == 0 92 93 self.get_interface_byname("lo10") 94 95 @pytest.mark.require_user("root") 96 def test_create_iface_plain_retvals(self): 97 """Tests loopback creation w/o any parameters""" 98 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 99 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 100 msg.nl_hdr.nlmsg_flags = ( 101 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 102 ) 103 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 104 msg.add_nla( 105 NlAttrNested( 106 IflattrType.IFLA_LINKINFO, 107 [ 108 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"), 109 ], 110 ) 111 ) 112 113 rx_msg = self.get_reply(msg) 114 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 115 assert rx_msg.error_code == 0 116 assert rx_msg.cookie is not None 117 nla_list, _ = rx_msg.parse_attrs(bytes(rx_msg.cookie)[4:], rtnl_ifla_attrs) 118 nla_map = {n.nla_type: n for n in nla_list} 119 assert IflattrType.IFLA_IFNAME.value in nla_map 120 assert nla_map[IflattrType.IFLA_IFNAME.value].text == "lo10" 121 assert IflattrType.IFLA_NEW_IFINDEX.value in nla_map 122 assert nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32 > 0 123 124 lo_msg = self.get_interface_byname("lo10") 125 assert ( 126 lo_msg.base_hdr.ifi_index == nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32 127 ) 128 129 @pytest.mark.require_user("root") 130 def test_create_iface_attrs(self): 131 """Tests interface creation with additional properties""" 132 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 133 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 134 msg.nl_hdr.nlmsg_flags = ( 135 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 136 ) 137 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 138 msg.add_nla( 139 NlAttrNested( 140 IflattrType.IFLA_LINKINFO, 141 [ 142 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"), 143 ], 144 ) 145 ) 146 147 # Custom attributes 148 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description")) 149 msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024)) 150 151 rx_msg = self.get_reply(msg) 152 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 153 assert rx_msg.error_code == 0 154 155 iface_msg = self.get_interface_byname("lo10") 156 assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description" 157 assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024 158 159 @pytest.mark.require_user("root") 160 def test_modify_iface_attrs(self): 161 """Tests interface modifications""" 162 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 163 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 164 msg.nl_hdr.nlmsg_flags = ( 165 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 166 ) 167 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 168 msg.add_nla( 169 NlAttrNested( 170 IflattrType.IFLA_LINKINFO, 171 [ 172 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"), 173 ], 174 ) 175 ) 176 177 rx_msg = self.get_reply(msg) 178 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 179 assert rx_msg.error_code == 0 180 181 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 182 msg.nl_hdr.nlmsg_flags = ( 183 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 184 ) 185 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 186 187 # Custom attributes 188 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description")) 189 msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024)) 190 191 rx_msg = self.get_reply(msg) 192 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 193 assert rx_msg.error_code == 0 194 195 iface_msg = self.get_interface_byname("lo10") 196 assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description" 197 assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024 198 199 @pytest.mark.require_user("root") 200 def test_delete_iface(self): 201 """Tests interface modifications""" 202 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 203 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 204 msg.nl_hdr.nlmsg_flags = ( 205 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 206 ) 207 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 208 msg.add_nla( 209 NlAttrNested( 210 IflattrType.IFLA_LINKINFO, 211 [ 212 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"), 213 ], 214 ) 215 ) 216 217 rx_msg = self.get_reply(msg) 218 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 219 assert rx_msg.error_code == 0 220 221 iface_msg = self.get_interface_byname("lo10") 222 iface_idx = iface_msg.base_hdr.ifi_index 223 224 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_DELLINK.value) 225 msg.nl_hdr.nlmsg_flags = ( 226 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 227 ) 228 msg.base_hdr.ifi_index = iface_idx 229 # msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 230 231 rx_msg = self.get_reply(msg) 232 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 233 assert rx_msg.error_code == 0 234 235 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value) 236 msg.nl_hdr.nlmsg_flags = ( 237 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 238 ) 239 msg.base_hdr.ifi_index = 2147483647 240 241 rx_msg = self.get_reply(msg) 242 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 243 assert rx_msg.error_code == errno.ENODEV 244 245 @pytest.mark.require_user("root") 246 def test_dump_ifaces_many(self): 247 """Tests if interface dummp is not missing interfaces""" 248 249 ifmap = {} 250 ifmap[socket.if_nametoindex("lo0")] = "lo0" 251 252 for i in range(40): 253 ifname = "lo{}".format(i + 1) 254 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 255 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 256 msg.nl_hdr.nlmsg_flags = ( 257 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 258 ) 259 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname)) 260 msg.add_nla( 261 NlAttrNested( 262 IflattrType.IFLA_LINKINFO, 263 [ 264 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"), 265 ], 266 ) 267 ) 268 269 rx_msg = self.get_reply(msg) 270 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 271 nla_list, _ = rx_msg.parse_attrs(bytes(rx_msg.cookie)[4:], rtnl_ifla_attrs) 272 nla_map = {n.nla_type: n for n in nla_list} 273 assert nla_map[IflattrType.IFLA_IFNAME.value].text == ifname 274 ifindex = nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32 275 assert ifindex > 0 276 assert ifindex not in ifmap 277 ifmap[ifindex] = ifname 278 279 # Dump all interfaces and check if the output matches ifmap 280 kernel_ifmap = {} 281 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value) 282 msg.nl_hdr.nlmsg_flags = ( 283 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 284 ) 285 self.write_message(msg) 286 while True: 287 rx_msg = self.read_message() 288 if msg.nl_hdr.nlmsg_seq != rx_msg.nl_hdr.nlmsg_seq: 289 raise ValueError( 290 "unexpected seq {}".format(rx_msg.nl_hdr.nlmsg_seq) 291 ) 292 if rx_msg.is_type(NlMsgType.NLMSG_ERROR): 293 raise ValueError("unexpected message {}".format(rx_msg)) 294 if rx_msg.is_type(NlMsgType.NLMSG_DONE): 295 break 296 if not rx_msg.is_type(NlRtMsgType.RTM_NEWLINK): 297 raise ValueError("unexpected message {}".format(rx_msg)) 298 299 ifindex = rx_msg.base_hdr.ifi_index 300 assert ifindex == rx_msg.base_hdr.ifi_index 301 ifname = rx_msg.get_nla(IflattrType.IFLA_IFNAME).text 302 if ifname.startswith("lo"): 303 kernel_ifmap[ifindex] = ifname 304 assert kernel_ifmap == ifmap 305 306 # 307 # * 308 # * {len=76, type=RTM_NEWLINK, flags=NLM_F_REQUEST|NLM_F_ACK|NLM_F_EXCL|NLM_F_CREATE, seq=1662892737, pid=0}, 309 # * {ifi_family=AF_UNSPEC, ifi_type=ARPHRD_NETROM, ifi_index=0, ifi_flags=0, ifi_change=0}, 310 # * {{nla_len=8, nla_type=IFLA_LINK}, 2}, 311 # * {{nla_len=12, nla_type=IFLA_IFNAME}, "xvlan22"}, 312 # * {{nla_len=24, nla_type=IFLA_LINKINFO}, 313 # * {{nla_len=8, nla_type=IFLA_INFO_KIND}, "vlan"...}, 314 # * {{nla_len=12, nla_type=IFLA_INFO_DATA}, "\x06\x00\x01\x00\x16\x00\x00\x00"} 315 # */ 316 @pytest.mark.skip(reason="vlan support needs more work") 317 @pytest.mark.require_user("root") 318 def test_create_vlan_plain(self): 319 """Creates 802.1Q VLAN interface in vlanXX and ifX fashion""" 320 os_ifname = self.vnet.iface_alias_map["if1"].name 321 ifindex = socket.if_nametoindex(os_ifname) 322 323 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 324 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 325 msg.nl_hdr.nlmsg_flags = ( 326 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 327 ) 328 329 msg.add_nla(NlAttrU32(IflattrType.IFLA_LINK, ifindex)) 330 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "vlan22")) 331 332 msg.add_nla( 333 NlAttrNested( 334 IflattrType.IFLA_LINKINFO, 335 [ 336 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "vlan"), 337 NlAttrNested( 338 IflinkInfo.IFLA_INFO_DATA, 339 [ 340 NlAttrU16(IfLinkInfoDataVlan.IFLA_VLAN_ID, 22), 341 ], 342 ), 343 ], 344 ) 345 ) 346 347 rx_msg = self.get_reply(msg) 348 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 349 assert rx_msg.error_code == 0 350 351 self.get_interface_byname("vlan22") 352 # ToolsHelper.print_net_debug() 353