1import errno 2import socket 3 4import pytest 5from atf_python.sys.net.netlink import IflattrType 6from atf_python.sys.net.netlink import IflinkInfo 7from atf_python.sys.net.netlink import IfLinkInfoDataVlan 8from atf_python.sys.net.netlink import NetlinkIflaMessage 9from atf_python.sys.net.netlink import NlAttrNested 10from atf_python.sys.net.netlink import NlAttrStr 11from atf_python.sys.net.netlink import NlAttrStrn 12from atf_python.sys.net.netlink import NlAttrU16 13from atf_python.sys.net.netlink import NlAttrU32 14from atf_python.sys.net.netlink import NlConst 15from atf_python.sys.net.netlink import NlHelper 16from atf_python.sys.net.netlink import NlmBaseFlags 17from atf_python.sys.net.netlink import NlmNewFlags 18from atf_python.sys.net.netlink import NlMsgType 19from atf_python.sys.net.netlink import NlRtMsgType 20from atf_python.sys.net.netlink import Nlsock 21from atf_python.sys.net.vnet import SingleVnetTestTemplate 22 23 24class TestRtNlIface(SingleVnetTestTemplate): 25 REQUIRED_MODULES = ["netlink"] 26 27 def setup_method(self, method): 28 super().setup_method(method) 29 self.helper = NlHelper() 30 self.nlsock = Nlsock(NlConst.NETLINK_ROUTE, self.helper) 31 32 def write_message(self, msg): 33 print("") 34 print("============= >> TX MESSAGE =============") 35 msg.print_message() 36 self.nlsock.write_data(bytes(msg)) 37 msg.print_as_bytes(bytes(msg), "-- DATA --") 38 39 def read_message(self): 40 msg = self.nlsock.read_message() 41 print("") 42 print("============= << RX MESSAGE =============") 43 msg.print_message() 44 return msg 45 46 def get_reply(self, tx_msg): 47 self.write_message(tx_msg) 48 while True: 49 rx_msg = self.read_message() 50 if tx_msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq: 51 return rx_msg 52 53 def get_interface_byname(self, ifname): 54 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value) 55 msg.nl_hdr.nlmsg_flags = ( 56 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 57 ) 58 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname)) 59 self.write_message(msg) 60 while True: 61 rx_msg = self.read_message() 62 if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq: 63 if rx_msg.is_type(NlMsgType.NLMSG_ERROR): 64 if rx_msg.error_code != 0: 65 raise ValueError("unable to get interface {}".format(ifname)) 66 elif rx_msg.is_type(NlRtMsgType.RTM_NEWLINK): 67 return rx_msg 68 else: 69 raise ValueError("bad message") 70 71 def test_get_iface_byname_error(self): 72 """Tests error on fetching non-existing interface name""" 73 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value) 74 msg.nl_hdr.nlmsg_flags = ( 75 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 76 ) 77 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 78 79 rx_msg = self.get_reply(msg) 80 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 81 assert rx_msg.error_code == errno.ENODEV 82 83 def test_get_iface_byindex_error(self): 84 """Tests error on fetching non-existing interface index""" 85 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value) 86 msg.nl_hdr.nlmsg_flags = ( 87 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 88 ) 89 msg.base_hdr.ifi_index = 2147483647 90 91 rx_msg = self.get_reply(msg) 92 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 93 assert rx_msg.error_code == errno.ENODEV 94 95 @pytest.mark.require_user("root") 96 def test_create_iface_plain(self): 97 """Tests loopback creation w/o any parameters""" 98 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 99 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 100 msg.nl_hdr.nlmsg_flags = ( 101 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 102 ) 103 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 104 msg.add_nla( 105 NlAttrNested( 106 IflattrType.IFLA_LINKINFO, 107 [ 108 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"), 109 ], 110 ) 111 ) 112 113 rx_msg = self.get_reply(msg) 114 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 115 assert rx_msg.error_code == 0 116 117 self.get_interface_byname("lo10") 118 119 @pytest.mark.require_user("root") 120 def test_create_iface_attrs(self): 121 """Tests interface creation with additional properties""" 122 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 123 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 124 msg.nl_hdr.nlmsg_flags = ( 125 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 126 ) 127 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 128 msg.add_nla( 129 NlAttrNested( 130 IflattrType.IFLA_LINKINFO, 131 [ 132 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"), 133 ], 134 ) 135 ) 136 137 # Custom attributes 138 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description")) 139 msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024)) 140 141 rx_msg = self.get_reply(msg) 142 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 143 assert rx_msg.error_code == 0 144 145 iface_msg = self.get_interface_byname("lo10") 146 assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description" 147 assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024 148 149 @pytest.mark.require_user("root") 150 def test_modify_iface_attrs(self): 151 """Tests interface modifications""" 152 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 153 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 154 msg.nl_hdr.nlmsg_flags = ( 155 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 156 ) 157 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 158 msg.add_nla( 159 NlAttrNested( 160 IflattrType.IFLA_LINKINFO, 161 [ 162 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"), 163 ], 164 ) 165 ) 166 167 rx_msg = self.get_reply(msg) 168 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 169 assert rx_msg.error_code == 0 170 171 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 172 msg.nl_hdr.nlmsg_flags = ( 173 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 174 ) 175 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 176 177 # Custom attributes 178 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description")) 179 msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024)) 180 181 rx_msg = self.get_reply(msg) 182 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 183 assert rx_msg.error_code == 0 184 185 iface_msg = self.get_interface_byname("lo10") 186 assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description" 187 assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024 188 189 @pytest.mark.require_user("root") 190 def test_delete_iface(self): 191 """Tests interface modifications""" 192 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 193 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 194 msg.nl_hdr.nlmsg_flags = ( 195 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 196 ) 197 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 198 msg.add_nla( 199 NlAttrNested( 200 IflattrType.IFLA_LINKINFO, 201 [ 202 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"), 203 ], 204 ) 205 ) 206 207 rx_msg = self.get_reply(msg) 208 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 209 assert rx_msg.error_code == 0 210 211 iface_msg = self.get_interface_byname("lo10") 212 iface_idx = iface_msg.base_hdr.ifi_index 213 214 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_DELLINK.value) 215 msg.nl_hdr.nlmsg_flags = ( 216 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 217 ) 218 msg.base_hdr.ifi_index = iface_idx 219 # msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) 220 221 rx_msg = self.get_reply(msg) 222 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 223 assert rx_msg.error_code == 0 224 225 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value) 226 msg.nl_hdr.nlmsg_flags = ( 227 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 228 ) 229 msg.base_hdr.ifi_index = 2147483647 230 231 rx_msg = self.get_reply(msg) 232 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 233 assert rx_msg.error_code == errno.ENODEV 234 235 # 236 # * 237 # * {len=76, type=RTM_NEWLINK, flags=NLM_F_REQUEST|NLM_F_ACK|NLM_F_EXCL|NLM_F_CREATE, seq=1662892737, pid=0}, 238 # * {ifi_family=AF_UNSPEC, ifi_type=ARPHRD_NETROM, ifi_index=0, ifi_flags=0, ifi_change=0}, 239 # * [ 240 # * {{nla_len=8, nla_type=IFLA_LINK}, 2}, 241 # * {{nla_len=12, nla_type=IFLA_IFNAME}, "xvlan22"}, 242 # * {{nla_len=24, nla_type=IFLA_LINKINFO}, 243 # * [ 244 # * {{nla_len=8, nla_type=IFLA_INFO_KIND}, "vlan"...}, 245 # * {{nla_len=12, nla_type=IFLA_INFO_DATA}, "\x06\x00\x01\x00\x16\x00\x00\x00"}]}]}, iov_len=76}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 76 246 # */ 247 @pytest.mark.skip(reason="vlan support needs more work") 248 @pytest.mark.require_user("root") 249 def test_create_vlan_plain(self): 250 """Creates 802.1Q VLAN interface in vlanXX and ifX fashion""" 251 os_ifname = self.vnet.iface_alias_map["if1"].name 252 ifindex = socket.if_nametoindex(os_ifname) 253 254 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 255 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) 256 msg.nl_hdr.nlmsg_flags = ( 257 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 258 ) 259 260 msg.add_nla(NlAttrU32(IflattrType.IFLA_LINK, ifindex)) 261 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "vlan22")) 262 263 msg.add_nla( 264 NlAttrNested( 265 IflattrType.IFLA_LINKINFO, 266 [ 267 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "vlan"), 268 NlAttrNested( 269 IflinkInfo.IFLA_INFO_DATA, 270 [ 271 NlAttrU16(IfLinkInfoDataVlan.IFLA_VLAN_ID, 22), 272 ], 273 ), 274 ], 275 ) 276 ) 277 278 rx_msg = self.get_reply(msg) 279 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 280 assert rx_msg.error_code == 0 281 282 self.get_interface_byname("vlan22") 283 # ToolsHelper.print_net_debug() 284