1import errno 2import socket 3 4import pytest 5from atf_python.sys.netlink.netlink_route import IflattrType 6from atf_python.sys.netlink.netlink_route import IflinkInfo 7from atf_python.sys.netlink.netlink_route import IfLinkInfoDataVlan 8from atf_python.sys.netlink.netlink_route import NetlinkIflaMessage 9from atf_python.sys.netlink.netlink import NetlinkTestTemplate 10from atf_python.sys.netlink.attrs import NlAttrNested 11from atf_python.sys.netlink.attrs import NlAttrStr 12from atf_python.sys.netlink.attrs import NlAttrStrn 13from atf_python.sys.netlink.attrs import NlAttrU16 14from atf_python.sys.netlink.attrs import NlAttrU32 15from atf_python.sys.netlink.utils import NlConst 16from atf_python.sys.netlink.base_headers import NlmBaseFlags 17from atf_python.sys.netlink.base_headers import NlmNewFlags 18from atf_python.sys.netlink.base_headers import NlMsgType 19from atf_python.sys.netlink.netlink_route import NlRtMsgType 20from atf_python.sys.netlink.netlink_route import rtnl_ifla_attrs 21from atf_python.sys.net.vnet import SingleVnetTestTemplate 22from atf_python.sys.net.tools import ToolsHelper 23 24 25class TestRtNlIface(NetlinkTestTemplate, SingleVnetTestTemplate): 26 def setup_method(self, method): 27 super().setup_method(method) 28 self.setup_netlink(NlConst.NETLINK_ROUTE) 29 30 def get_interface_byname(self, ifname): 31 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value) 32 msg.nl_hdr.nlmsg_flags = ( 33 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 34 ) 35 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname)) 36 self.write_message(msg) 37 while True: 38 rx_msg = self.read_message() 39 if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq: 40 if rx_msg.is_type(NlMsgType.NLMSG_ERROR): 41 if rx_msg.error_code != 0: 42 raise ValueError("unable to get interface {}".format(ifname)) 43 elif rx_msg.is_type(NlRtMsgType.RTM_NEWLINK): 44 return rx_msg 45 else: 46 raise ValueError("bad message") 47 48 def test_get_iface_byname_error(self): 49 """Tests error on fetching non-existing interface name""" 50 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value) 51 msg.nl_hdr.nlmsg_flags = ( 52 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 53 ) 54 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 55 56 rx_msg = self.get_reply(msg) 57 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 58 assert rx_msg.error_code == errno.ENODEV 59 60 def test_get_iface_byindex_error(self): 61 """Tests error on fetching non-existing interface index""" 62 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value) 63 msg.nl_hdr.nlmsg_flags = ( 64 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 65 ) 66 msg.base_hdr.ifi_index = 2147483647 67 68 rx_msg = self.get_reply(msg) 69 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 70 assert rx_msg.error_code == errno.ENODEV 71 72 @pytest.mark.require_user("root") 73 def test_create_iface_plain(self): 74 """Tests loopback creation w/o any parameters""" 75 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 76 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 77 msg.nl_hdr.nlmsg_flags = ( 78 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 79 ) 80 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 81 msg.add_nla( 82 NlAttrNested( 83 IflattrType.IFLA_LINKINFO, 84 [ 85 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"), 86 ], 87 ) 88 ) 89 90 rx_msg = self.get_reply(msg) 91 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 92 assert rx_msg.error_code == 0 93 94 self.get_interface_byname("lo10") 95 96 @pytest.mark.require_user("root") 97 def test_create_iface_plain_retvals(self): 98 """Tests loopback creation w/o any parameters""" 99 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 100 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 101 msg.nl_hdr.nlmsg_flags = ( 102 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 103 ) 104 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 105 msg.add_nla( 106 NlAttrNested( 107 IflattrType.IFLA_LINKINFO, 108 [ 109 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"), 110 ], 111 ) 112 ) 113 114 rx_msg = self.get_reply(msg) 115 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 116 assert rx_msg.error_code == 0 117 assert rx_msg.cookie is not None 118 nla_list, _ = rx_msg.parse_attrs(bytes(rx_msg.cookie)[4:], rtnl_ifla_attrs) 119 nla_map = {n.nla_type: n for n in nla_list} 120 assert IflattrType.IFLA_IFNAME.value in nla_map 121 assert nla_map[IflattrType.IFLA_IFNAME.value].text == "lo10" 122 assert IflattrType.IFLA_NEW_IFINDEX.value in nla_map 123 assert nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32 > 0 124 125 lo_msg = self.get_interface_byname("lo10") 126 assert ( 127 lo_msg.base_hdr.ifi_index == nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32 128 ) 129 130 @pytest.mark.require_user("root") 131 def test_create_iface_attrs(self): 132 """Tests interface creation with additional properties""" 133 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 134 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 135 msg.nl_hdr.nlmsg_flags = ( 136 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 137 ) 138 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 139 msg.add_nla( 140 NlAttrNested( 141 IflattrType.IFLA_LINKINFO, 142 [ 143 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"), 144 ], 145 ) 146 ) 147 148 # Custom attributes 149 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description")) 150 msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024)) 151 152 rx_msg = self.get_reply(msg) 153 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 154 assert rx_msg.error_code == 0 155 156 iface_msg = self.get_interface_byname("lo10") 157 assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description" 158 assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024 159 160 @pytest.mark.require_user("root") 161 def test_modify_iface_attrs(self): 162 """Tests interface modifications""" 163 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 164 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 165 msg.nl_hdr.nlmsg_flags = ( 166 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 167 ) 168 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 169 msg.add_nla( 170 NlAttrNested( 171 IflattrType.IFLA_LINKINFO, 172 [ 173 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"), 174 ], 175 ) 176 ) 177 178 rx_msg = self.get_reply(msg) 179 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 180 assert rx_msg.error_code == 0 181 182 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 183 msg.nl_hdr.nlmsg_flags = ( 184 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 185 ) 186 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 187 188 # Custom attributes 189 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description")) 190 msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024)) 191 192 rx_msg = self.get_reply(msg) 193 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 194 assert rx_msg.error_code == 0 195 196 iface_msg = self.get_interface_byname("lo10") 197 assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description" 198 assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024 199 200 @pytest.mark.require_user("root") 201 def test_delete_iface(self): 202 """Tests interface modifications""" 203 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 204 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 205 msg.nl_hdr.nlmsg_flags = ( 206 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 207 ) 208 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 209 msg.add_nla( 210 NlAttrNested( 211 IflattrType.IFLA_LINKINFO, 212 [ 213 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"), 214 ], 215 ) 216 ) 217 218 rx_msg = self.get_reply(msg) 219 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 220 assert rx_msg.error_code == 0 221 222 iface_msg = self.get_interface_byname("lo10") 223 iface_idx = iface_msg.base_hdr.ifi_index 224 225 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_DELLINK.value) 226 msg.nl_hdr.nlmsg_flags = ( 227 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 228 ) 229 msg.base_hdr.ifi_index = iface_idx 230 # msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 231 232 rx_msg = self.get_reply(msg) 233 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 234 assert rx_msg.error_code == 0 235 236 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value) 237 msg.nl_hdr.nlmsg_flags = ( 238 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 239 ) 240 msg.base_hdr.ifi_index = 2147483647 241 242 rx_msg = self.get_reply(msg) 243 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 244 assert rx_msg.error_code == errno.ENODEV 245 246 @pytest.mark.require_user("root") 247 def test_dump_ifaces_many(self): 248 """Tests if interface dummp is not missing interfaces""" 249 250 ifmap = {} 251 ifmap[socket.if_nametoindex("lo0")] = "lo0" 252 253 for i in range(40): 254 ifname = "lo{}".format(i + 1) 255 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 256 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 257 msg.nl_hdr.nlmsg_flags = ( 258 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 259 ) 260 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname)) 261 msg.add_nla( 262 NlAttrNested( 263 IflattrType.IFLA_LINKINFO, 264 [ 265 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"), 266 ], 267 ) 268 ) 269 270 rx_msg = self.get_reply(msg) 271 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 272 nla_list, _ = rx_msg.parse_attrs(bytes(rx_msg.cookie)[4:], rtnl_ifla_attrs) 273 nla_map = {n.nla_type: n for n in nla_list} 274 assert nla_map[IflattrType.IFLA_IFNAME.value].text == ifname 275 ifindex = nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32 276 assert ifindex > 0 277 assert ifindex not in ifmap 278 ifmap[ifindex] = ifname 279 280 # Dump all interfaces and check if the output matches ifmap 281 kernel_ifmap = {} 282 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value) 283 msg.nl_hdr.nlmsg_flags = ( 284 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 285 ) 286 self.write_message(msg) 287 while True: 288 rx_msg = self.read_message() 289 if msg.nl_hdr.nlmsg_seq != rx_msg.nl_hdr.nlmsg_seq: 290 raise ValueError( 291 "unexpected seq {}".format(rx_msg.nl_hdr.nlmsg_seq) 292 ) 293 if rx_msg.is_type(NlMsgType.NLMSG_ERROR): 294 raise ValueError("unexpected message {}".format(rx_msg)) 295 if rx_msg.is_type(NlMsgType.NLMSG_DONE): 296 break 297 if not rx_msg.is_type(NlRtMsgType.RTM_NEWLINK): 298 raise ValueError("unexpected message {}".format(rx_msg)) 299 300 ifindex = rx_msg.base_hdr.ifi_index 301 assert ifindex == rx_msg.base_hdr.ifi_index 302 ifname = rx_msg.get_nla(IflattrType.IFLA_IFNAME).text 303 if ifname.startswith("lo"): 304 kernel_ifmap[ifindex] = ifname 305 assert kernel_ifmap == ifmap 306 307 # 308 # * 309 # * {len=76, type=RTM_NEWLINK, flags=NLM_F_REQUEST|NLM_F_ACK|NLM_F_EXCL|NLM_F_CREATE, seq=1662892737, pid=0}, 310 # * {ifi_family=AF_UNSPEC, ifi_type=ARPHRD_NETROM, ifi_index=0, ifi_flags=0, ifi_change=0}, 311 # * {{nla_len=8, nla_type=IFLA_LINK}, 2}, 312 # * {{nla_len=12, nla_type=IFLA_IFNAME}, "xvlan22"}, 313 # * {{nla_len=24, nla_type=IFLA_LINKINFO}, 314 # * {{nla_len=8, nla_type=IFLA_INFO_KIND}, "vlan"...}, 315 # * {{nla_len=12, nla_type=IFLA_INFO_DATA}, "\x06\x00\x01\x00\x16\x00\x00\x00"} 316 # */ 317 @pytest.mark.require_user("root") 318 def test_create_vlan_plain(self): 319 """Creates 802.1Q VLAN interface in vlanXX and ifX fashion""" 320 self.require_module("if_vlan") 321 os_ifname = self.vnet.iface_alias_map["if1"].name 322 ifindex = socket.if_nametoindex(os_ifname) 323 324 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 325 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 326 msg.nl_hdr.nlmsg_flags = ( 327 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 328 ) 329 msg.base_hdr.ifi_index = ifindex 330 331 msg.add_nla(NlAttrU32(IflattrType.IFLA_LINK, ifindex)) 332 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "vlan22")) 333 334 msg.add_nla( 335 NlAttrNested( 336 IflattrType.IFLA_LINKINFO, 337 [ 338 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "vlan"), 339 NlAttrNested( 340 IflinkInfo.IFLA_INFO_DATA, 341 [ 342 NlAttrU16(IfLinkInfoDataVlan.IFLA_VLAN_ID, 22), 343 ], 344 ), 345 ], 346 ) 347 ) 348 349 rx_msg = self.get_reply(msg) 350 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 351 assert rx_msg.error_code == 0 352 353 ToolsHelper.print_net_debug() 354 self.get_interface_byname("vlan22") 355 # ToolsHelper.print_net_debug() 356