1import ipaddress 2import socket 3import struct 4 5import pytest 6from atf_python.sys.net.vnet import SingleVnetTestTemplate 7from atf_python.sys.netlink.attrs import NlAttr 8from atf_python.sys.netlink.attrs import NlAttrIp 9from atf_python.sys.netlink.attrs import NlAttrNested 10from atf_python.sys.netlink.attrs import NlAttrU32 11from atf_python.sys.netlink.base_headers import NlmBaseFlags 12from atf_python.sys.netlink.base_headers import NlmNewFlags 13from atf_python.sys.netlink.base_headers import Nlmsghdr 14from atf_python.sys.netlink.message import NlMsgType 15from atf_python.sys.netlink.netlink import NetlinkTestTemplate 16from atf_python.sys.netlink.netlink import Nlsock 17from atf_python.sys.netlink.netlink_generic import CarpAttrType 18from atf_python.sys.netlink.netlink_generic import CarpGenMessage 19from atf_python.sys.netlink.netlink_generic import CarpMsgType 20from atf_python.sys.netlink.netlink_route import IfaAttrType 21from atf_python.sys.netlink.netlink_route import IfaCacheInfo 22from atf_python.sys.netlink.netlink_route import IfafAttrType 23from atf_python.sys.netlink.netlink_route import IfafFlags6 24from atf_python.sys.netlink.netlink_route import IfaFlags 25from atf_python.sys.netlink.netlink_route import NetlinkIfaMessage 26from atf_python.sys.netlink.netlink_route import NlRtMsgType 27from atf_python.sys.netlink.netlink_route import RtScope 28from atf_python.sys.netlink.utils import enum_or_int 29from atf_python.sys.netlink.utils import NlConst 30 31 32class TestRtNlIfaddrList(NetlinkTestTemplate, SingleVnetTestTemplate): 33 def setup_method(self, method): 34 method_name = method.__name__ 35 if "4" in method_name: 36 if "nofilter" in method_name: 37 self.IPV4_PREFIXES = ["192.0.2.1/24", "169.254.169.254/16"] 38 else: 39 self.IPV4_PREFIXES = ["192.0.2.1/24"] 40 if "6" in method_name: 41 self.IPV6_PREFIXES = ["2001:db8::1/64"] 42 super().setup_method(method) 43 self.setup_netlink(NlConst.NETLINK_ROUTE) 44 45 def test_46_nofilter(self): 46 """Tests that listing outputs both IPv4/IPv6 and interfaces""" 47 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) 48 msg.set_request() 49 self.write_message(msg) 50 51 ret = [] 52 for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): 53 ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index) 54 family = rx_msg.base_hdr.ifa_family 55 scope = rx_msg.base_hdr.ifa_scope 56 ret.append((ifname, family, scope)) 57 58 ifname = "lo0" 59 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET and r[2] == RtScope.RT_SCOPE_HOST.value]) == 1 60 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_HOST.value]) == 1 61 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_LINK.value]) == 1 62 assert len([r for r in ret if r[0] == ifname]) == 3 63 64 ifname = self.vnet.iface_alias_map["if1"].name 65 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET and r[2] == RtScope.RT_SCOPE_LINK.value]) == 1 66 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET and r[2] == RtScope.RT_SCOPE_UNIVERSE.value]) == 1 67 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_LINK.value]) == 1 68 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_UNIVERSE.value]) == 1 69 assert len([r for r in ret if r[0] == ifname]) == 4 70 71 def test_46_filter_iface(self): 72 """Tests that listing outputs both IPv4/IPv6 for the specific interface""" 73 epair_ifname = self.vnet.iface_alias_map["if1"].name 74 75 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) 76 msg.set_request() 77 msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname) 78 self.write_message(msg) 79 80 ret = [] 81 for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): 82 ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index) 83 family = rx_msg.base_hdr.ifa_family 84 ret.append((ifname, family, rx_msg)) 85 86 ifname = epair_ifname 87 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1 88 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2 89 assert len(ret) == 3 90 91 def test_46_filter_family_compat(self): 92 """Tests that family filtering works with the stripped header""" 93 94 hdr = Nlmsghdr( 95 nlmsg_len=17, 96 nlmsg_type=NlRtMsgType.RTM_GETADDR.value, 97 nlmsg_flags=NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value, 98 nlmsg_seq=self.helper.get_seq(), 99 ) 100 data = bytes(hdr) + struct.pack("@B", socket.AF_INET) 101 self.nlsock.write_data(data) 102 103 ret = [] 104 for rx_msg in self.read_msg_list(hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): 105 ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index) 106 family = rx_msg.base_hdr.ifa_family 107 ret.append((ifname, family, rx_msg)) 108 assert len(ret) == 2 109 110 def filter_iface_family(self, family, num_items): 111 """Tests that listing outputs IPv4 for the specific interface""" 112 epair_ifname = self.vnet.iface_alias_map["if1"].name 113 114 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) 115 msg.set_request() 116 msg.base_hdr.ifa_family = family 117 msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname) 118 self.write_message(msg) 119 120 ret = [] 121 for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): 122 assert family == rx_msg.base_hdr.ifa_family 123 assert epair_ifname == socket.if_indextoname(rx_msg.base_hdr.ifa_index) 124 ret.append(rx_msg) 125 assert len(ret) == num_items 126 return ret 127 128 def test_4_broadcast(self): 129 """Tests header/attr output for listing IPv4 ifas on broadcast iface""" 130 ret = self.filter_iface_family(socket.AF_INET, 1) 131 # Should be 192.0.2.1/24 132 msg = ret[0] 133 # Family and ifindex has been checked already 134 assert msg.base_hdr.ifa_prefixlen == 24 135 # Ignore IFA_FLAGS for now 136 assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 137 138 assert msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == "192.0.2.1" 139 assert msg.get_nla(IfaAttrType.IFA_LOCAL).addr == "192.0.2.1" 140 assert msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == "192.0.2.255" 141 142 epair_ifname = self.vnet.iface_alias_map["if1"].name 143 assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname 144 145 def test_6_broadcast(self): 146 """Tests header/attr output for listing IPv6 ifas on broadcast iface""" 147 ret = self.filter_iface_family(socket.AF_INET6, 2) 148 # Should be 192.0.2.1/24 149 if ret[0].base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value: 150 (gmsg, lmsg) = ret 151 else: 152 (lmsg, gmsg) = ret 153 # Start with global ( 2001:db8::1/64 ) 154 msg = gmsg 155 # Family and ifindex has been checked already 156 assert msg.base_hdr.ifa_prefixlen == 64 157 # Ignore IFA_FLAGS for now 158 assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 159 160 assert msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == "2001:db8::1" 161 assert msg.get_nla(IfaAttrType.IFA_LOCAL) is None 162 assert msg.get_nla(IfaAttrType.IFA_BROADCAST) is None 163 164 epair_ifname = self.vnet.iface_alias_map["if1"].name 165 assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname 166 167 # Local: fe80::/64 168 msg = lmsg 169 assert msg.base_hdr.ifa_prefixlen == 64 170 # Ignore IFA_FLAGS for now 171 assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_LINK.value 172 173 addr = ipaddress.ip_address(msg.get_nla(IfaAttrType.IFA_ADDRESS).addr) 174 assert addr.is_link_local 175 # Verify that ifindex is not emmbedded 176 assert struct.unpack("!H", addr.packed[2:4])[0] == 0 177 assert msg.get_nla(IfaAttrType.IFA_LOCAL) is None 178 assert msg.get_nla(IfaAttrType.IFA_BROADCAST) is None 179 180 epair_ifname = self.vnet.iface_alias_map["if1"].name 181 assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname 182 183 184class RtnlIfaOps(NetlinkTestTemplate, SingleVnetTestTemplate): 185 def setup_method(self, method): 186 super().setup_method(method) 187 self.setup_netlink(NlConst.NETLINK_ROUTE) 188 189 def send_check_success(self, msg): 190 rx_msg = self.get_reply(msg) 191 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 192 assert rx_msg.error_code == 0 193 194 @staticmethod 195 def get_family_from_ip(ip): 196 if ip.version == 4: 197 return socket.AF_INET 198 return socket.AF_INET6 199 200 def create_msg(self, ifa): 201 iface = self.vnet.iface_alias_map["if1"] 202 203 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_NEWADDR.value) 204 msg.set_request() 205 msg.nl_hdr.nlmsg_flags |= ( 206 NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 207 ) 208 msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip) 209 msg.base_hdr.ifa_index = iface.ifindex 210 msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen 211 return msg 212 213 def get_ifa_list(self, ifindex=0, family=0): 214 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) 215 msg.set_request() 216 msg.base_hdr.ifa_family = family 217 msg.base_hdr.ifa_index = ifindex 218 self.write_message(msg) 219 return self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR) 220 221 def find_msg_by_ifa(self, msg_list, ip): 222 for msg in msg_list: 223 if msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ip): 224 return msg 225 return None 226 227 def setup_dummy_carp(self, ifindex: int, vhid: int): 228 self.require_module("carp") 229 230 nlsock = Nlsock(NlConst.NETLINK_GENERIC, self.helper) 231 family_id = nlsock.get_genl_family_id("carp") 232 233 msg = CarpGenMessage(self.helper, family_id, CarpMsgType.CARP_NL_CMD_SET) 234 msg.set_request() 235 msg.add_nla(NlAttrU32(CarpAttrType.CARP_NL_VHID, vhid)) 236 msg.add_nla(NlAttrU32(CarpAttrType.CARP_NL_IFINDEX, ifindex)) 237 rx_msg = nlsock.get_reply(msg) 238 239 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 240 assert rx_msg.error_code == 0 241 242 243class TestRtNlIfaddrOpsBroadcast(RtnlIfaOps): 244 def test_add_4(self): 245 """Tests IPv4 address addition to the standard broadcast interface""" 246 ifa = ipaddress.ip_interface("192.0.2.1/24") 247 ifa_brd = ifa.network.broadcast_address 248 iface = self.vnet.iface_alias_map["if1"] 249 250 msg = self.create_msg(ifa) 251 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 252 msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd))) 253 254 self.send_check_success(msg) 255 256 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 257 assert len(lst) == 1 258 rx_msg = lst[0] 259 260 assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen 261 assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 262 263 assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) 264 assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip) 265 assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd) 266 267 @pytest.mark.parametrize( 268 "brd", 269 [ 270 pytest.param((32, True, "192.0.2.1"), id="auto_32"), 271 pytest.param((31, True, "255.255.255.255"), id="auto_31"), 272 pytest.param((30, True, "192.0.2.3"), id="auto_30"), 273 pytest.param((30, False, "192.0.2.2"), id="custom_30"), 274 pytest.param((24, False, "192.0.2.7"), id="custom_24"), 275 ], 276 ) 277 def test_add_4_brd(self, brd): 278 """Tests proper broadcast setup when adding IPv4 ifa""" 279 plen, auto_brd, ifa_brd_str = brd 280 ifa = ipaddress.ip_interface("192.0.2.1/{}".format(plen)) 281 iface = self.vnet.iface_alias_map["if1"] 282 ifa_brd = ipaddress.ip_address(ifa_brd_str) 283 284 msg = self.create_msg(ifa) 285 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 286 if not auto_brd: 287 msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd))) 288 289 self.send_check_success(msg) 290 291 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 292 assert len(lst) == 1 293 rx_msg = lst[0] 294 295 assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen 296 assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 297 298 assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) 299 assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip) 300 assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd) 301 302 def test_add_6(self): 303 ifa = ipaddress.ip_interface("2001:db8::1/64") 304 iface = self.vnet.iface_alias_map["if1"] 305 306 msg = self.create_msg(ifa) 307 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 308 309 self.send_check_success(msg) 310 311 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 312 assert len(lst) == 2 313 rx_msg_gu = self.find_msg_by_ifa(lst, ifa.ip) 314 assert rx_msg_gu is not None 315 316 assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen 317 assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 318 assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) 319 320 def test_add_4_carp(self): 321 ifa = ipaddress.ip_interface("192.0.2.1/24") 322 ifa_brd = ifa.network.broadcast_address 323 iface = self.vnet.iface_alias_map["if1"] 324 vhid = 77 325 326 self.setup_dummy_carp(iface.ifindex, vhid) 327 328 msg = self.create_msg(ifa) 329 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 330 msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd))) 331 attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_VHID, vhid)] 332 msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd)) 333 334 self.send_check_success(msg) 335 336 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 337 assert len(lst) == 1 338 rx_msg = lst[0] 339 340 assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen 341 assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 342 343 assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) 344 assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip) 345 assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd) 346 ifa_bsd = rx_msg.get_nla(IfaAttrType.IFA_FREEBSD) 347 assert ifa_bsd.get_nla(IfafAttrType.IFAF_VHID).u32 == vhid 348 349 def test_add_6_carp(self): 350 ifa = ipaddress.ip_interface("2001:db8::1/64") 351 iface = self.vnet.iface_alias_map["if1"] 352 vhid = 77 353 354 self.setup_dummy_carp(iface.ifindex, vhid) 355 356 msg = self.create_msg(ifa) 357 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 358 attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_VHID, vhid)] 359 msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd)) 360 361 self.send_check_success(msg) 362 363 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 364 assert len(lst) == 2 365 rx_msg_gu = self.find_msg_by_ifa(lst, ifa.ip) 366 assert rx_msg_gu is not None 367 368 assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen 369 assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 370 assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) 371 ifa_bsd = rx_msg_gu.get_nla(IfaAttrType.IFA_FREEBSD) 372 assert ifa_bsd.get_nla(IfafAttrType.IFAF_VHID).u32 == vhid 373 374 def test_add_6_lifetime(self): 375 ifa = ipaddress.ip_interface("2001:db8::1/64") 376 iface = self.vnet.iface_alias_map["if1"] 377 pref_time = 43200 378 valid_time = 86400 379 380 ci = IfaCacheInfo(ifa_prefered=pref_time, ifa_valid=valid_time) 381 382 msg = self.create_msg(ifa) 383 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 384 msg.add_nla(NlAttr(IfaAttrType.IFA_CACHEINFO, bytes(ci))) 385 386 self.send_check_success(msg) 387 388 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 389 assert len(lst) == 2 390 rx_msg = self.find_msg_by_ifa(lst, ifa.ip) 391 assert rx_msg is not None 392 393 ci = rx_msg.get_nla(IfaAttrType.IFA_CACHEINFO).ci 394 assert pref_time - 5 <= ci.ifa_prefered <= pref_time 395 assert valid_time - 5 <= ci.ifa_valid <= valid_time 396 assert ci.cstamp > 0 397 assert ci.tstamp > 0 398 assert ci.tstamp >= ci.cstamp 399 400 @pytest.mark.parametrize( 401 "flags_str", 402 [ 403 "autoconf", 404 "deprecated", 405 "autoconf,deprecated", 406 "prefer_source", 407 ], 408 ) 409 def test_add_6_flags(self, flags_str): 410 ifa = ipaddress.ip_interface("2001:db8::1/64") 411 iface = self.vnet.iface_alias_map["if1"] 412 413 flags_map = { 414 "autoconf": {"nl": 0, "f": IfafFlags6.IN6_IFF_AUTOCONF}, 415 "deprecated": { 416 "nl": IfaFlags.IFA_F_DEPRECATED, 417 "f": IfafFlags6.IN6_IFF_DEPRECATED, 418 }, 419 "prefer_source": {"nl": 0, "f": IfafFlags6.IN6_IFF_PREFER_SOURCE}, 420 } 421 nl_flags = 0 422 f_flags = 0 423 424 for flag_str in flags_str.split(","): 425 d = flags_map.get(flag_str, {}) 426 nl_flags |= enum_or_int(d.get("nl", 0)) 427 f_flags |= enum_or_int(d.get("f", 0)) 428 429 msg = self.create_msg(ifa) 430 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 431 msg.add_nla(NlAttrU32(IfaAttrType.IFA_FLAGS, nl_flags)) 432 attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_FLAGS, f_flags)] 433 msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd)) 434 435 self.send_check_success(msg) 436 437 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 438 assert len(lst) == 2 439 rx_msg = self.find_msg_by_ifa(lst, ifa.ip) 440 assert rx_msg is not None 441 442 assert rx_msg.get_nla(IfaAttrType.IFA_FLAGS).u32 == nl_flags 443 ifa_bsd = rx_msg.get_nla(IfaAttrType.IFA_FREEBSD) 444 assert ifa_bsd.get_nla(IfafAttrType.IFAF_FLAGS).u32 == f_flags 445 446 def test_add_4_empty_message(self): 447 """Tests correct failure w/ empty message""" 448 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_NEWADDR.value) 449 msg.set_request() 450 msg.nl_hdr.nlmsg_flags |= ( 451 NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 452 ) 453 454 rx_msg = self.get_reply(msg) 455 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 456 assert rx_msg.error_code != 0 457 458 def test_add_4_empty_ifindex(self): 459 """Tests correct failure w/ empty ifindex""" 460 ifa = ipaddress.ip_interface("192.0.2.1/24") 461 ifa_brd = ifa.network.broadcast_address 462 463 msg = self.create_msg(ifa) 464 msg.base_hdr.ifa_index = 0 465 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 466 msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd))) 467 468 rx_msg = self.get_reply(msg) 469 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 470 assert rx_msg.error_code != 0 471 472 def test_add_4_empty_addr(self): 473 """Tests correct failure w/ empty address""" 474 ifa = ipaddress.ip_interface("192.0.2.1/24") 475 ifa_brd = ifa.network.broadcast_address 476 477 msg = self.create_msg(ifa) 478 msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd))) 479 480 rx_msg = self.get_reply(msg) 481 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 482 assert rx_msg.error_code != 0 483 484 @pytest.mark.parametrize( 485 "ifa_str", 486 [ 487 pytest.param("192.0.2.1/32", id="ipv4_host"), 488 pytest.param("192.0.2.1/24", id="ipv4_prefix"), 489 pytest.param("2001:db8::1/64", id="ipv6_gu_prefix"), 490 pytest.param("2001:db8::1/128", id="ipv6_gu_host"), 491 ], 492 ) 493 @pytest.mark.parametrize( 494 "tlv", 495 [ 496 pytest.param("local", id="ifa_local"), 497 pytest.param("address", id="ifa_address"), 498 ], 499 ) 500 def test_del(self, tlv, ifa_str): 501 """Tests address deletion from the standard broadcast interface""" 502 ifa = ipaddress.ip_interface(ifa_str) 503 ifa_brd = ifa.network.broadcast_address 504 iface = self.vnet.iface_alias_map["if1"] 505 506 msg = self.create_msg(ifa) 507 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 508 msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd))) 509 510 self.send_check_success(msg) 511 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 512 rx_msg = self.find_msg_by_ifa(lst, ifa.ip) 513 assert rx_msg is not None 514 515 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value) 516 msg.set_request() 517 msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip) 518 msg.base_hdr.ifa_index = iface.ifindex 519 msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen 520 521 if tlv == "local": 522 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 523 if tlv == "address": 524 msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip))) 525 526 self.send_check_success(msg) 527 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 528 rx_msg = self.find_msg_by_ifa(lst, ifa.ip) 529 assert rx_msg is None 530 531 532class TestRtNlIfaddrOpsP2p(RtnlIfaOps): 533 IFTYPE = "gif" 534 535 @pytest.mark.parametrize( 536 "ifa_pair", 537 [ 538 pytest.param(["192.0.2.1/24", "192.0.2.2"], id="dst_inside_24"), 539 pytest.param(["192.0.2.1/30", "192.0.2.2"], id="dst_inside_30"), 540 pytest.param(["192.0.2.1/31", "192.0.2.2"], id="dst_inside_31"), 541 pytest.param(["192.0.2.1/32", "192.0.2.2"], id="dst_outside_32"), 542 pytest.param(["192.0.2.1/30", "192.0.2.100"], id="dst_outside_30"), 543 ], 544 ) 545 def test_add_4(self, ifa_pair): 546 """Tests IPv4 address addition to the p2p interface""" 547 ifa = ipaddress.ip_interface(ifa_pair[0]) 548 peer_ip = ipaddress.ip_address(ifa_pair[1]) 549 iface = self.vnet.iface_alias_map["if1"] 550 551 msg = self.create_msg(ifa) 552 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 553 msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip))) 554 555 self.send_check_success(msg) 556 557 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 558 assert len(lst) == 1 559 rx_msg = lst[0] 560 561 assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen 562 assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 563 564 assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip) 565 assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(peer_ip) 566 567 @pytest.mark.parametrize( 568 "ifa_pair", 569 [ 570 pytest.param( 571 ["2001:db8::1/64", "2001:db8::2"], 572 id="dst_inside_64", 573 marks=pytest.mark.xfail(reason="currently fails"), 574 ), 575 pytest.param( 576 ["2001:db8::1/127", "2001:db8::2"], 577 id="dst_inside_127", 578 marks=pytest.mark.xfail(reason="currently fails"), 579 ), 580 pytest.param(["2001:db8::1/128", "2001:db8::2"], id="dst_outside_128"), 581 pytest.param( 582 ["2001:db8::1/64", "2001:db8:2::2"], 583 id="dst_outside_64", 584 marks=pytest.mark.xfail(reason="currently fails"), 585 ), 586 ], 587 ) 588 def test_add_6(self, ifa_pair): 589 """Tests IPv6 address addition to the p2p interface""" 590 ifa = ipaddress.ip_interface(ifa_pair[0]) 591 peer_ip = ipaddress.ip_address(ifa_pair[1]) 592 iface = self.vnet.iface_alias_map["if1"] 593 594 msg = self.create_msg(ifa) 595 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 596 msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip))) 597 598 self.send_check_success(msg) 599 600 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 601 assert len(lst) == 2 602 rx_msg_gu = self.find_msg_by_ifa(lst, peer_ip) 603 assert rx_msg_gu is not None 604 605 assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen 606 assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 607 assert rx_msg_gu.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip) 608 assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(peer_ip) 609 610 @pytest.mark.parametrize( 611 "ifa_pair", 612 [ 613 pytest.param(["192.0.2.1/30", "192.0.2.2"], id="ipv4_dst_inside_30"), 614 pytest.param(["192.0.2.1/32", "192.0.2.2"], id="ipv4_dst_outside_32"), 615 pytest.param(["2001:db8::1/128", "2001:db8::2"], id="ip6_dst_outside_128"), 616 ], 617 ) 618 @pytest.mark.parametrize( 619 "tlv_pair", 620 [ 621 pytest.param(["a", ""], id="ifa_addr=addr"), 622 pytest.param(["", "a"], id="ifa_local=addr"), 623 pytest.param(["a", "a"], id="ifa_addr=addr,ifa_local=addr"), 624 ], 625 ) 626 def test_del(self, tlv_pair, ifa_pair): 627 """Tests address deletion from the P2P interface""" 628 ifa = ipaddress.ip_interface(ifa_pair[0]) 629 peer_ip = ipaddress.ip_address(ifa_pair[1]) 630 iface = self.vnet.iface_alias_map["if1"] 631 ifa_addr_str, ifa_local_str = tlv_pair 632 633 msg = self.create_msg(ifa) 634 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 635 msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip))) 636 637 self.send_check_success(msg) 638 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 639 rx_msg = self.find_msg_by_ifa(lst, peer_ip) 640 assert rx_msg is not None 641 642 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value) 643 msg.set_request() 644 msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip) 645 msg.base_hdr.ifa_index = iface.ifindex 646 msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen 647 648 if "a" in ifa_addr_str: 649 msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip))) 650 if "p" in ifa_addr_str: 651 msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip))) 652 if "a" in ifa_local_str: 653 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 654 if "p" in ifa_local_str: 655 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(peer_ip))) 656 657 self.send_check_success(msg) 658 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 659 rx_msg = self.find_msg_by_ifa(lst, ifa.ip) 660 assert rx_msg is None 661 662 663class TestRtNlAddIfaddrLo(RtnlIfaOps): 664 IFTYPE = "lo" 665 666 @pytest.mark.parametrize( 667 "ifa_str", 668 [ 669 pytest.param("192.0.2.1/24", id="prefix"), 670 pytest.param("192.0.2.1/32", id="host"), 671 ], 672 ) 673 def test_add_4(self, ifa_str): 674 """Tests IPv4 address addition to the loopback interface""" 675 ifa = ipaddress.ip_interface(ifa_str) 676 iface = self.vnet.iface_alias_map["if1"] 677 678 msg = self.create_msg(ifa) 679 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 680 681 self.send_check_success(msg) 682 683 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 684 assert len(lst) == 1 685 rx_msg = lst[0] 686 687 assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen 688 assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 689 690 assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) 691 assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip) 692 693 @pytest.mark.parametrize( 694 "ifa_str", 695 [ 696 pytest.param("2001:db8::1/64", id="gu_prefix"), 697 pytest.param("2001:db8::1/128", id="gu_host"), 698 ], 699 ) 700 def test_add_6(self, ifa_str): 701 """Tests IPv6 address addition to the loopback interface""" 702 ifa = ipaddress.ip_interface(ifa_str) 703 iface = self.vnet.iface_alias_map["if1"] 704 705 msg = self.create_msg(ifa) 706 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 707 708 self.send_check_success(msg) 709 710 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 711 assert len(lst) == 2 # link-local should be auto-created as well 712 rx_msg = self.find_msg_by_ifa(lst, ifa.ip) 713 assert rx_msg is not None 714 715 assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen 716 assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 717 assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) 718 719 @pytest.mark.parametrize( 720 "ifa_str", 721 [ 722 pytest.param("192.0.2.1/32", id="ipv4_host"), 723 pytest.param("192.0.2.1/24", id="ipv4_prefix"), 724 pytest.param("2001:db8::1/64", id="ipv6_gu_prefix"), 725 pytest.param("2001:db8::1/128", id="ipv6_gu_host"), 726 ], 727 ) 728 @pytest.mark.parametrize( 729 "tlv", 730 [ 731 pytest.param("local", id="ifa_local"), 732 pytest.param("address", id="ifa_address"), 733 ], 734 ) 735 def test_del(self, tlv, ifa_str): 736 """Tests address deletion from the loopback interface""" 737 ifa = ipaddress.ip_interface(ifa_str) 738 iface = self.vnet.iface_alias_map["if1"] 739 740 msg = self.create_msg(ifa) 741 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 742 743 self.send_check_success(msg) 744 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 745 rx_msg = self.find_msg_by_ifa(lst, ifa.ip) 746 assert rx_msg is not None 747 748 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value) 749 msg.set_request() 750 msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip) 751 msg.base_hdr.ifa_index = iface.ifindex 752 msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen 753 754 if tlv == "local": 755 msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 756 if tlv == "address": 757 msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip))) 758 759 self.send_check_success(msg) 760 lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 761 rx_msg = self.find_msg_by_ifa(lst, ifa.ip) 762 assert rx_msg is None 763