1import ipaddress 2import socket 3import struct 4 5import pytest 6from atf_python.sys.net.vnet import SingleVnetTestTemplate 7from atf_python.sys.netlink.attrs import NlAttr 8from atf_python.sys.netlink.attrs import NlAttrIp 9from atf_python.sys.netlink.attrs import NlAttrNested 10from atf_python.sys.netlink.attrs import NlAttrU32 11from atf_python.sys.netlink.base_headers import NlmBaseFlags 12from atf_python.sys.netlink.base_headers import NlmNewFlags 13from atf_python.sys.netlink.base_headers import Nlmsghdr 14from atf_python.sys.netlink.message import NlMsgType 15from atf_python.sys.netlink.netlink import NetlinkTestTemplate 16from atf_python.sys.netlink.netlink import Nlsock 17from atf_python.sys.netlink.netlink_generic import CarpAttrType 18from atf_python.sys.netlink.netlink_generic import CarpGenMessage 19from atf_python.sys.netlink.netlink_generic import CarpMsgType 20from atf_python.sys.netlink.netlink_route import IfaAttrType 21from atf_python.sys.netlink.netlink_route import IfaCacheInfo 22from atf_python.sys.netlink.netlink_route import IfafAttrType 23from atf_python.sys.netlink.netlink_route import IfafFlags6 24from atf_python.sys.netlink.netlink_route import IfaFlags 25from atf_python.sys.netlink.netlink_route import NetlinkIfaMessage 26from atf_python.sys.netlink.netlink_route import NlRtMsgType 27from atf_python.sys.netlink.netlink_route import RtScope 28from atf_python.sys.netlink.utils import enum_or_int 29from atf_python.sys.netlink.utils import NlConst 30 31 32class TestRtNlIfaddrList(NetlinkTestTemplate, SingleVnetTestTemplate): 33 def setup_method(self, method): 34 method_name = method.__name__ 35 if "4" in method_name: 36 self.IPV4_PREFIXES = ["192.0.2.1/24"] 37 if "6" in method_name: 38 self.IPV6_PREFIXES = ["2001:db8::1/64"] 39 super().setup_method(method) 40 self.setup_netlink(NlConst.NETLINK_ROUTE) 41 42 def test_46_nofilter(self): 43 """Tests that listing outputs both IPv4/IPv6 and interfaces""" 44 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) 45 msg.set_request() 46 self.write_message(msg) 47 48 ret = [] 49 for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): 50 ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index) 51 family = rx_msg.base_hdr.ifa_family 52 ret.append((ifname, family, rx_msg)) 53 54 ifname = "lo0" 55 assert len([r for r in ret if r[0] == ifname]) > 0 56 57 ifname = self.vnet.iface_alias_map["if1"].name 58 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1 59 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2 60 61 def test_46_filter_iface(self): 62 """Tests that listing outputs both IPv4/IPv6 for the specific interface""" 63 epair_ifname = self.vnet.iface_alias_map["if1"].name 64 65 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) 66 msg.set_request() 67 msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname) 68 self.write_message(msg) 69 70 ret = [] 71 for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): 72 ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index) 73 family = rx_msg.base_hdr.ifa_family 74 ret.append((ifname, family, rx_msg)) 75 76 ifname = epair_ifname 77 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1 78 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2 79 assert len(ret) == 3 80 81 def test_46_filter_family_compat(self): 82 """Tests that family filtering works with the stripped header""" 83 84 hdr = Nlmsghdr( 85 nlmsg_len=17, 86 nlmsg_type=NlRtMsgType.RTM_GETADDR.value, 87 nlmsg_flags=NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value, 88 nlmsg_seq=self.helper.get_seq(), 89 ) 90 data = bytes(hdr) + struct.pack("@B", socket.AF_INET) 91 self.nlsock.write_data(data) 92 93 ret = [] 94 for rx_msg in self.read_msg_list(hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): 95 ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index) 96 family = rx_msg.base_hdr.ifa_family 97 ret.append((ifname, family, rx_msg)) 98 assert len(ret) == 2 99 100 def filter_iface_family(self, family, num_items): 101 """Tests that listing outputs IPv4 for the specific interface""" 102 epair_ifname = self.vnet.iface_alias_map["if1"].name 103 104 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) 105 msg.set_request() 106 msg.base_hdr.ifa_family = family 107 msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname) 108 self.write_message(msg) 109 110 ret = [] 111 for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): 112 assert family == rx_msg.base_hdr.ifa_family 113 assert epair_ifname == socket.if_indextoname(rx_msg.base_hdr.ifa_index) 114 ret.append(rx_msg) 115 assert len(ret) == num_items 116 return ret 117 118 def test_4_broadcast(self): 119 """Tests header/attr output for listing IPv4 ifas on broadcast iface""" 120 ret = self.filter_iface_family(socket.AF_INET, 1) 121 # Should be 192.0.2.1/24 122 msg = ret[0] 123 # Family and ifindex has been checked already 124 assert msg.base_hdr.ifa_prefixlen == 24 125 # Ignore IFA_FLAGS for now 126 assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 127 128 assert msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == "192.0.2.1" 129 assert msg.get_nla(IfaAttrType.IFA_LOCAL).addr == "192.0.2.1" 130 assert msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == "192.0.2.255" 131 132 epair_ifname = self.vnet.iface_alias_map["if1"].name 133 assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname 134 135 def test_6_broadcast(self): 136 """Tests header/attr output for listing IPv6 ifas on broadcast iface""" 137 ret = self.filter_iface_family(socket.AF_INET6, 2) 138 # Should be 192.0.2.1/24 139 if ret[0].base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value: 140 (gmsg, lmsg) = ret 141 else: 142 (lmsg, gmsg) = ret 143 # Start with global ( 2001:db8::1/64 ) 144 msg = gmsg 145 # Family and ifindex has been checked already 146 assert msg.base_hdr.ifa_prefixlen == 64 147 # Ignore IFA_FLAGS for now 148 assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 149 150 assert msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == "2001:db8::1" 151 assert msg.get_nla(IfaAttrType.IFA_LOCAL) is None 152 assert msg.get_nla(IfaAttrType.IFA_BROADCAST) is None 153 154 epair_ifname = self.vnet.iface_alias_map["if1"].name 155 assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname 156 157 # Local: fe80::/64 158 msg = lmsg 159 assert msg.base_hdr.ifa_prefixlen == 64 160 # Ignore IFA_FLAGS for now 161 assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_LINK.value 162 163 addr = ipaddress.ip_address(msg.get_nla(IfaAttrType.IFA_ADDRESS).addr) 164 assert addr.is_link_local 165 # Verify that ifindex is not emmbedded 166 assert struct.unpack("!H", addr.packed[2:4])[0] == 0 167 assert msg.get_nla(IfaAttrType.IFA_LOCAL) is None 168 assert msg.get_nla(IfaAttrType.IFA_BROADCAST) is None 169 170 epair_ifname = self.vnet.iface_alias_map["if1"].name 171 assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname 172 173 174class RtnlIfaOps(NetlinkTestTemplate, SingleVnetTestTemplate): 175 def setup_method(self, method): 176 super().setup_method(method) 177 self.setup_netlink(NlConst.NETLINK_ROUTE) 178 179 def send_check_success(self, msg): 180 rx_msg = self.get_reply(msg) 181 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 182 assert rx_msg.error_code == 0 183 184 @staticmethod 185 def get_family_from_ip(ip): 186 if ip.version == 4: 187 return socket.AF_INET 188 return socket.AF_INET6 189 190 def create_msg(self, ifa): 191 iface = self.vnet.iface_alias_map["if1"] 192 193 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_NEWADDR.value) 194 msg.set_request() 195 msg.nl_hdr.nlmsg_flags |= ( 196 NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 197 ) 198 msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip) 199 msg.base_hdr.ifa_index = iface.ifindex 200 msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen 201 return msg 202 203 def get_ifa_list(self, ifindex=0, family=0): 204 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) 205 msg.set_request() 206 msg.base_hdr.ifa_family = family 207 msg.base_hdr.ifa_index = ifindex 208 self.write_message(msg) 209 return self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR) 210 211 def find_msg_by_ifa(self, msg_list, ip): 212 for msg in msg_list: 213 if msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ip): 214 return msg 215 return None 216 217 def setup_dummy_carp(self, ifindex: int, vhid: int): 218 self.require_module("carp") 219 220 nlsock = Nlsock(NlConst.NETLINK_GENERIC, self.helper) 221 family_id = nlsock.get_genl_family_id("carp") 222 223 msg = CarpGenMessage(self.helper, family_id, CarpMsgType.CARP_NL_CMD_SET) 224 msg.set_request() 225 msg.add_nla(NlAttrU32(CarpAttrType.CARP_NL_VHID, vhid)) 226 msg.add_nla(NlAttrU32(CarpAttrType.CARP_NL_IFINDEX, ifindex)) 227 rx_msg = nlsock.get_reply(msg) 228 229 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 230 assert rx_msg.error_code == 0 231 232 233class TestRtNlIfaddrOpsBroadcast(RtnlIfaOps): 234 def test_add_4(self): 235 """Tests IPv4 address addition to the standard broadcast interface""" 236 ifa = ipaddress.ip_interface("192.0.2.1/24") 237 ifa_brd = ifa.network.broadcast_address 238 iface = self.vnet.iface_alias_map["if1"] 239 240 msg = self.create_msg(ifa) 241 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 242 msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd))) 243 244 self.send_check_success(msg) 245 246 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 247 assert len(lst) == 1 248 rx_msg = lst[0] 249 250 assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen 251 assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 252 253 assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) 254 assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip) 255 assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd) 256 257 @pytest.mark.parametrize( 258 "brd", 259 [ 260 pytest.param((32, True, "192.0.2.1"), id="auto_32"), 261 pytest.param((31, True, "255.255.255.255"), id="auto_31"), 262 pytest.param((30, True, "192.0.2.3"), id="auto_30"), 263 pytest.param((30, False, "192.0.2.2"), id="custom_30"), 264 pytest.param((24, False, "192.0.2.7"), id="custom_24"), 265 ], 266 ) 267 def test_add_4_brd(self, brd): 268 """Tests proper broadcast setup when adding IPv4 ifa""" 269 plen, auto_brd, ifa_brd_str = brd 270 ifa = ipaddress.ip_interface("192.0.2.1/{}".format(plen)) 271 iface = self.vnet.iface_alias_map["if1"] 272 ifa_brd = ipaddress.ip_address(ifa_brd_str) 273 274 msg = self.create_msg(ifa) 275 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 276 if not auto_brd: 277 msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd))) 278 279 self.send_check_success(msg) 280 281 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 282 assert len(lst) == 1 283 rx_msg = lst[0] 284 285 assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen 286 assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 287 288 assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) 289 assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip) 290 assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd) 291 292 def test_add_6(self): 293 ifa = ipaddress.ip_interface("2001:db8::1/64") 294 iface = self.vnet.iface_alias_map["if1"] 295 296 msg = self.create_msg(ifa) 297 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 298 299 self.send_check_success(msg) 300 301 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 302 assert len(lst) == 2 303 rx_msg_gu = self.find_msg_by_ifa(lst, ifa.ip) 304 assert rx_msg_gu is not None 305 306 assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen 307 assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 308 assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) 309 310 def test_add_4_carp(self): 311 ifa = ipaddress.ip_interface("192.0.2.1/24") 312 ifa_brd = ifa.network.broadcast_address 313 iface = self.vnet.iface_alias_map["if1"] 314 vhid = 77 315 316 self.setup_dummy_carp(iface.ifindex, vhid) 317 318 msg = self.create_msg(ifa) 319 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 320 msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd))) 321 attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_VHID, vhid)] 322 msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd)) 323 324 self.send_check_success(msg) 325 326 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 327 assert len(lst) == 1 328 rx_msg = lst[0] 329 330 assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen 331 assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 332 333 assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) 334 assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip) 335 assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd) 336 ifa_bsd = rx_msg.get_nla(IfaAttrType.IFA_FREEBSD) 337 assert ifa_bsd.get_nla(IfafAttrType.IFAF_VHID).u32 == vhid 338 339 def test_add_6_carp(self): 340 ifa = ipaddress.ip_interface("2001:db8::1/64") 341 iface = self.vnet.iface_alias_map["if1"] 342 vhid = 77 343 344 self.setup_dummy_carp(iface.ifindex, vhid) 345 346 msg = self.create_msg(ifa) 347 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 348 attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_VHID, vhid)] 349 msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd)) 350 351 self.send_check_success(msg) 352 353 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 354 assert len(lst) == 2 355 rx_msg_gu = self.find_msg_by_ifa(lst, ifa.ip) 356 assert rx_msg_gu is not None 357 358 assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen 359 assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 360 assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) 361 ifa_bsd = rx_msg_gu.get_nla(IfaAttrType.IFA_FREEBSD) 362 assert ifa_bsd.get_nla(IfafAttrType.IFAF_VHID).u32 == vhid 363 364 def test_add_6_lifetime(self): 365 ifa = ipaddress.ip_interface("2001:db8::1/64") 366 iface = self.vnet.iface_alias_map["if1"] 367 pref_time = 43200 368 valid_time = 86400 369 370 ci = IfaCacheInfo(ifa_prefered=pref_time, ifa_valid=valid_time) 371 372 msg = self.create_msg(ifa) 373 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 374 msg.add_nla(NlAttr(IfaAttrType.IFA_CACHEINFO, bytes(ci))) 375 376 self.send_check_success(msg) 377 378 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 379 assert len(lst) == 2 380 rx_msg = self.find_msg_by_ifa(lst, ifa.ip) 381 assert rx_msg is not None 382 383 ci = rx_msg.get_nla(IfaAttrType.IFA_CACHEINFO).ci 384 assert pref_time - 5 <= ci.ifa_prefered <= pref_time 385 assert valid_time - 5 <= ci.ifa_valid <= valid_time 386 assert ci.cstamp > 0 387 assert ci.tstamp > 0 388 assert ci.tstamp >= ci.cstamp 389 390 @pytest.mark.parametrize( 391 "flags_str", 392 [ 393 "autoconf", 394 "deprecated", 395 "autoconf,deprecated", 396 "prefer_source", 397 ], 398 ) 399 def test_add_6_flags(self, flags_str): 400 ifa = ipaddress.ip_interface("2001:db8::1/64") 401 iface = self.vnet.iface_alias_map["if1"] 402 403 flags_map = { 404 "autoconf": {"nl": 0, "f": IfafFlags6.IN6_IFF_AUTOCONF}, 405 "deprecated": { 406 "nl": IfaFlags.IFA_F_DEPRECATED, 407 "f": IfafFlags6.IN6_IFF_DEPRECATED, 408 }, 409 "prefer_source": {"nl": 0, "f": IfafFlags6.IN6_IFF_PREFER_SOURCE}, 410 } 411 nl_flags = 0 412 f_flags = 0 413 414 for flag_str in flags_str.split(","): 415 d = flags_map.get(flag_str, {}) 416 nl_flags |= enum_or_int(d.get("nl", 0)) 417 f_flags |= enum_or_int(d.get("f", 0)) 418 419 msg = self.create_msg(ifa) 420 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 421 msg.add_nla(NlAttrU32(IfaAttrType.IFA_FLAGS, nl_flags)) 422 attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_FLAGS, f_flags)] 423 msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd)) 424 425 self.send_check_success(msg) 426 427 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 428 assert len(lst) == 2 429 rx_msg = self.find_msg_by_ifa(lst, ifa.ip) 430 assert rx_msg is not None 431 432 assert rx_msg.get_nla(IfaAttrType.IFA_FLAGS).u32 == nl_flags 433 ifa_bsd = rx_msg.get_nla(IfaAttrType.IFA_FREEBSD) 434 assert ifa_bsd.get_nla(IfafAttrType.IFAF_FLAGS).u32 == f_flags 435 436 def test_add_4_empty_message(self): 437 """Tests correct failure w/ empty message""" 438 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_NEWADDR.value) 439 msg.set_request() 440 msg.nl_hdr.nlmsg_flags |= ( 441 NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 442 ) 443 444 rx_msg = self.get_reply(msg) 445 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 446 assert rx_msg.error_code != 0 447 448 def test_add_4_empty_ifindex(self): 449 """Tests correct failure w/ empty ifindex""" 450 ifa = ipaddress.ip_interface("192.0.2.1/24") 451 ifa_brd = ifa.network.broadcast_address 452 453 msg = self.create_msg(ifa) 454 msg.base_hdr.ifa_index = 0 455 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 456 msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd))) 457 458 rx_msg = self.get_reply(msg) 459 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 460 assert rx_msg.error_code != 0 461 462 def test_add_4_empty_addr(self): 463 """Tests correct failure w/ empty address""" 464 ifa = ipaddress.ip_interface("192.0.2.1/24") 465 ifa_brd = ifa.network.broadcast_address 466 467 msg = self.create_msg(ifa) 468 msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd))) 469 470 rx_msg = self.get_reply(msg) 471 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 472 assert rx_msg.error_code != 0 473 474 @pytest.mark.parametrize( 475 "ifa_str", 476 [ 477 pytest.param("192.0.2.1/32", id="ipv4_host"), 478 pytest.param("192.0.2.1/24", id="ipv4_prefix"), 479 pytest.param("2001:db8::1/64", id="ipv6_gu_prefix"), 480 pytest.param("2001:db8::1/128", id="ipv6_gu_host"), 481 ], 482 ) 483 @pytest.mark.parametrize( 484 "tlv", 485 [ 486 pytest.param("local", id="ifa_local"), 487 pytest.param("address", id="ifa_address"), 488 ], 489 ) 490 def test_del(self, tlv, ifa_str): 491 """Tests address deletion from the standard broadcast interface""" 492 ifa = ipaddress.ip_interface(ifa_str) 493 ifa_brd = ifa.network.broadcast_address 494 iface = self.vnet.iface_alias_map["if1"] 495 496 msg = self.create_msg(ifa) 497 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 498 msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd))) 499 500 self.send_check_success(msg) 501 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 502 rx_msg = self.find_msg_by_ifa(lst, ifa.ip) 503 assert rx_msg is not None 504 505 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value) 506 msg.set_request() 507 msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip) 508 msg.base_hdr.ifa_index = iface.ifindex 509 msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen 510 511 if tlv == "local": 512 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 513 if tlv == "address": 514 msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip))) 515 516 self.send_check_success(msg) 517 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 518 rx_msg = self.find_msg_by_ifa(lst, ifa.ip) 519 assert rx_msg is None 520 521 522class TestRtNlIfaddrOpsP2p(RtnlIfaOps): 523 IFTYPE = "gif" 524 525 @pytest.mark.parametrize( 526 "ifa_pair", 527 [ 528 pytest.param(["192.0.2.1/24", "192.0.2.2"], id="dst_inside_24"), 529 pytest.param(["192.0.2.1/30", "192.0.2.2"], id="dst_inside_30"), 530 pytest.param(["192.0.2.1/31", "192.0.2.2"], id="dst_inside_31"), 531 pytest.param(["192.0.2.1/32", "192.0.2.2"], id="dst_outside_32"), 532 pytest.param(["192.0.2.1/30", "192.0.2.100"], id="dst_outside_30"), 533 ], 534 ) 535 def test_add_4(self, ifa_pair): 536 """Tests IPv4 address addition to the p2p interface""" 537 ifa = ipaddress.ip_interface(ifa_pair[0]) 538 peer_ip = ipaddress.ip_address(ifa_pair[1]) 539 iface = self.vnet.iface_alias_map["if1"] 540 541 msg = self.create_msg(ifa) 542 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 543 msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip))) 544 545 self.send_check_success(msg) 546 547 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 548 assert len(lst) == 1 549 rx_msg = lst[0] 550 551 assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen 552 assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 553 554 assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip) 555 assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(peer_ip) 556 557 @pytest.mark.parametrize( 558 "ifa_pair", 559 [ 560 pytest.param( 561 ["2001:db8::1/64", "2001:db8::2"], 562 id="dst_inside_64", 563 marks=pytest.mark.xfail(reason="currently fails"), 564 ), 565 pytest.param( 566 ["2001:db8::1/127", "2001:db8::2"], 567 id="dst_inside_127", 568 marks=pytest.mark.xfail(reason="currently fails"), 569 ), 570 pytest.param(["2001:db8::1/128", "2001:db8::2"], id="dst_outside_128"), 571 pytest.param( 572 ["2001:db8::1/64", "2001:db8:2::2"], 573 id="dst_outside_64", 574 marks=pytest.mark.xfail(reason="currently fails"), 575 ), 576 ], 577 ) 578 def test_add_6(self, ifa_pair): 579 """Tests IPv6 address addition to the p2p interface""" 580 ifa = ipaddress.ip_interface(ifa_pair[0]) 581 peer_ip = ipaddress.ip_address(ifa_pair[1]) 582 iface = self.vnet.iface_alias_map["if1"] 583 584 msg = self.create_msg(ifa) 585 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 586 msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip))) 587 588 self.send_check_success(msg) 589 590 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 591 assert len(lst) == 2 592 rx_msg_gu = self.find_msg_by_ifa(lst, peer_ip) 593 assert rx_msg_gu is not None 594 595 assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen 596 assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 597 assert rx_msg_gu.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip) 598 assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(peer_ip) 599 600 @pytest.mark.parametrize( 601 "ifa_pair", 602 [ 603 pytest.param(["192.0.2.1/30", "192.0.2.2"], id="ipv4_dst_inside_30"), 604 pytest.param(["192.0.2.1/32", "192.0.2.2"], id="ipv4_dst_outside_32"), 605 pytest.param(["2001:db8::1/128", "2001:db8::2"], id="ip6_dst_outside_128"), 606 ], 607 ) 608 @pytest.mark.parametrize( 609 "tlv_pair", 610 [ 611 pytest.param(["a", ""], id="ifa_addr=addr"), 612 pytest.param(["", "a"], id="ifa_local=addr"), 613 pytest.param(["a", "a"], id="ifa_addr=addr,ifa_local=addr"), 614 ], 615 ) 616 def test_del(self, tlv_pair, ifa_pair): 617 """Tests address deletion from the P2P interface""" 618 ifa = ipaddress.ip_interface(ifa_pair[0]) 619 peer_ip = ipaddress.ip_address(ifa_pair[1]) 620 iface = self.vnet.iface_alias_map["if1"] 621 ifa_addr_str, ifa_local_str = tlv_pair 622 623 msg = self.create_msg(ifa) 624 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 625 msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip))) 626 627 self.send_check_success(msg) 628 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 629 rx_msg = self.find_msg_by_ifa(lst, peer_ip) 630 assert rx_msg is not None 631 632 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value) 633 msg.set_request() 634 msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip) 635 msg.base_hdr.ifa_index = iface.ifindex 636 msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen 637 638 if "a" in ifa_addr_str: 639 msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip))) 640 if "p" in ifa_addr_str: 641 msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip))) 642 if "a" in ifa_local_str: 643 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 644 if "p" in ifa_local_str: 645 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(peer_ip))) 646 647 self.send_check_success(msg) 648 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 649 rx_msg = self.find_msg_by_ifa(lst, ifa.ip) 650 assert rx_msg is None 651 652 653class TestRtNlAddIfaddrLo(RtnlIfaOps): 654 IFTYPE = "lo" 655 656 @pytest.mark.parametrize( 657 "ifa_str", 658 [ 659 pytest.param("192.0.2.1/24", id="prefix"), 660 pytest.param("192.0.2.1/32", id="host"), 661 ], 662 ) 663 def test_add_4(self, ifa_str): 664 """Tests IPv4 address addition to the loopback interface""" 665 ifa = ipaddress.ip_interface(ifa_str) 666 iface = self.vnet.iface_alias_map["if1"] 667 668 msg = self.create_msg(ifa) 669 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 670 671 self.send_check_success(msg) 672 673 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 674 assert len(lst) == 1 675 rx_msg = lst[0] 676 677 assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen 678 assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 679 680 assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) 681 assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip) 682 683 @pytest.mark.parametrize( 684 "ifa_str", 685 [ 686 pytest.param("2001:db8::1/64", id="gu_prefix"), 687 pytest.param("2001:db8::1/128", id="gu_host"), 688 ], 689 ) 690 def test_add_6(self, ifa_str): 691 """Tests IPv6 address addition to the loopback interface""" 692 ifa = ipaddress.ip_interface(ifa_str) 693 iface = self.vnet.iface_alias_map["if1"] 694 695 msg = self.create_msg(ifa) 696 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 697 698 self.send_check_success(msg) 699 700 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 701 assert len(lst) == 2 # link-local should be auto-created as well 702 rx_msg = self.find_msg_by_ifa(lst, ifa.ip) 703 assert rx_msg is not None 704 705 assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen 706 assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 707 assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) 708 709 @pytest.mark.parametrize( 710 "ifa_str", 711 [ 712 pytest.param("192.0.2.1/32", id="ipv4_host"), 713 pytest.param("192.0.2.1/24", id="ipv4_prefix"), 714 pytest.param("2001:db8::1/64", id="ipv6_gu_prefix"), 715 pytest.param("2001:db8::1/128", id="ipv6_gu_host"), 716 ], 717 ) 718 @pytest.mark.parametrize( 719 "tlv", 720 [ 721 pytest.param("local", id="ifa_local"), 722 pytest.param("address", id="ifa_address"), 723 ], 724 ) 725 def test_del(self, tlv, ifa_str): 726 """Tests address deletion from the loopback interface""" 727 ifa = ipaddress.ip_interface(ifa_str) 728 iface = self.vnet.iface_alias_map["if1"] 729 730 msg = self.create_msg(ifa) 731 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 732 733 self.send_check_success(msg) 734 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 735 rx_msg = self.find_msg_by_ifa(lst, ifa.ip) 736 assert rx_msg is not None 737 738 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value) 739 msg.set_request() 740 msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip) 741 msg.base_hdr.ifa_index = iface.ifindex 742 msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen 743 744 if tlv == "local": 745 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 746 if tlv == "address": 747 msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip))) 748 749 self.send_check_success(msg) 750 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 751 rx_msg = self.find_msg_by_ifa(lst, ifa.ip) 752 assert rx_msg is None 753