1c1871a33SAlexander V. Chernikovimport ipaddress 2c1871a33SAlexander V. Chernikovimport socket 3c1871a33SAlexander V. Chernikovimport struct 4c1871a33SAlexander V. Chernikov 510b94e40SAlexander V. Chernikovimport pytest 6c1871a33SAlexander V. Chernikovfrom atf_python.sys.net.vnet import SingleVnetTestTemplate 710b94e40SAlexander V. Chernikovfrom atf_python.sys.netlink.attrs import NlAttr 810b94e40SAlexander V. Chernikovfrom atf_python.sys.netlink.attrs import NlAttrIp 910b94e40SAlexander V. Chernikovfrom atf_python.sys.netlink.attrs import NlAttrNested 1010b94e40SAlexander V. Chernikovfrom atf_python.sys.netlink.attrs import NlAttrU32 11fee65b7eSAlexander V. Chernikovfrom atf_python.sys.netlink.base_headers import NlmBaseFlags 1210b94e40SAlexander V. Chernikovfrom atf_python.sys.netlink.base_headers import NlmNewFlags 13fee65b7eSAlexander V. Chernikovfrom atf_python.sys.netlink.base_headers import Nlmsghdr 1410b94e40SAlexander V. Chernikovfrom atf_python.sys.netlink.message import NlMsgType 15fee65b7eSAlexander V. Chernikovfrom atf_python.sys.netlink.netlink import NetlinkTestTemplate 1610b94e40SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink import Nlsock 1710b94e40SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_generic import CarpAttrType 1810b94e40SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_generic import CarpGenMessage 1910b94e40SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_generic import CarpMsgType 20d91f8db5SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_route import IfaAttrType 2110b94e40SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_route import IfaCacheInfo 2210b94e40SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_route import IfafAttrType 2310b94e40SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_route import IfafFlags6 2410b94e40SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_route import IfaFlags 25fee65b7eSAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_route import NetlinkIfaMessage 26fee65b7eSAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_route import NlRtMsgType 27fee65b7eSAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_route import RtScope 2810b94e40SAlexander V. Chernikovfrom atf_python.sys.netlink.utils import enum_or_int 29fee65b7eSAlexander V. Chernikovfrom atf_python.sys.netlink.utils import NlConst 30c1871a33SAlexander V. Chernikov 31c1871a33SAlexander V. Chernikov 3210b94e40SAlexander V. Chernikovclass TestRtNlIfaddrList(NetlinkTestTemplate, SingleVnetTestTemplate): 33c1871a33SAlexander V. Chernikov def setup_method(self, method): 34c1871a33SAlexander V. Chernikov method_name = method.__name__ 35c1871a33SAlexander V. Chernikov if "4" in method_name: 36c1871a33SAlexander V. Chernikov self.IPV4_PREFIXES = ["192.0.2.1/24"] 37c1871a33SAlexander V. Chernikov if "6" in method_name: 38c1871a33SAlexander V. Chernikov self.IPV6_PREFIXES = ["2001:db8::1/64"] 39c1871a33SAlexander V. Chernikov super().setup_method(method) 40c1871a33SAlexander V. Chernikov self.setup_netlink(NlConst.NETLINK_ROUTE) 41c1871a33SAlexander V. Chernikov 42c1871a33SAlexander V. Chernikov def test_46_nofilter(self): 43c1871a33SAlexander V. Chernikov """Tests that listing outputs both IPv4/IPv6 and interfaces""" 44c1871a33SAlexander V. Chernikov msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) 4510b94e40SAlexander V. Chernikov msg.set_request() 46c1871a33SAlexander V. Chernikov self.write_message(msg) 47c1871a33SAlexander V. Chernikov 48c1871a33SAlexander V. Chernikov ret = [] 49c1871a33SAlexander V. Chernikov for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): 50c1871a33SAlexander V. Chernikov ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index) 51c1871a33SAlexander V. Chernikov family = rx_msg.base_hdr.ifa_family 52c1871a33SAlexander V. Chernikov ret.append((ifname, family, rx_msg)) 53c1871a33SAlexander V. Chernikov 54c1871a33SAlexander V. Chernikov ifname = "lo0" 55c1871a33SAlexander V. Chernikov assert len([r for r in ret if r[0] == ifname]) > 0 56c1871a33SAlexander V. Chernikov 57c1871a33SAlexander V. Chernikov ifname = self.vnet.iface_alias_map["if1"].name 58c1871a33SAlexander V. Chernikov assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1 59c1871a33SAlexander V. Chernikov assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2 60c1871a33SAlexander V. Chernikov 61c1871a33SAlexander V. Chernikov def test_46_filter_iface(self): 62c1871a33SAlexander V. Chernikov """Tests that listing outputs both IPv4/IPv6 for the specific interface""" 63c1871a33SAlexander V. Chernikov epair_ifname = self.vnet.iface_alias_map["if1"].name 64c1871a33SAlexander V. Chernikov 65c1871a33SAlexander V. Chernikov msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) 6610b94e40SAlexander V. Chernikov msg.set_request() 67c1871a33SAlexander V. Chernikov msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname) 68c1871a33SAlexander V. Chernikov self.write_message(msg) 69c1871a33SAlexander V. Chernikov 70c1871a33SAlexander V. Chernikov ret = [] 71c1871a33SAlexander V. Chernikov for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): 72c1871a33SAlexander V. Chernikov ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index) 73c1871a33SAlexander V. Chernikov family = rx_msg.base_hdr.ifa_family 74c1871a33SAlexander V. Chernikov ret.append((ifname, family, rx_msg)) 75c1871a33SAlexander V. Chernikov 76c1871a33SAlexander V. Chernikov ifname = epair_ifname 77c1871a33SAlexander V. Chernikov assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1 78c1871a33SAlexander V. Chernikov assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2 79c1871a33SAlexander V. Chernikov assert len(ret) == 3 80c1871a33SAlexander V. Chernikov 81228c632aSAlexander V. Chernikov def test_46_filter_family_compat(self): 82228c632aSAlexander V. Chernikov """Tests that family filtering works with the stripped header""" 83228c632aSAlexander V. Chernikov 84228c632aSAlexander V. Chernikov hdr = Nlmsghdr( 85228c632aSAlexander V. Chernikov nlmsg_len=17, 86228c632aSAlexander V. Chernikov nlmsg_type=NlRtMsgType.RTM_GETADDR.value, 87228c632aSAlexander V. Chernikov nlmsg_flags=NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value, 8810b94e40SAlexander V. Chernikov nlmsg_seq=self.helper.get_seq(), 89228c632aSAlexander V. Chernikov ) 90228c632aSAlexander V. Chernikov data = bytes(hdr) + struct.pack("@B", socket.AF_INET) 91228c632aSAlexander V. Chernikov self.nlsock.write_data(data) 92228c632aSAlexander V. Chernikov 93228c632aSAlexander V. Chernikov ret = [] 94228c632aSAlexander V. Chernikov for rx_msg in self.read_msg_list(hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): 95228c632aSAlexander V. Chernikov ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index) 96228c632aSAlexander V. Chernikov family = rx_msg.base_hdr.ifa_family 97228c632aSAlexander V. Chernikov ret.append((ifname, family, rx_msg)) 98228c632aSAlexander V. Chernikov assert len(ret) == 2 99228c632aSAlexander V. Chernikov 100c1871a33SAlexander V. Chernikov def filter_iface_family(self, family, num_items): 101c1871a33SAlexander V. Chernikov """Tests that listing outputs IPv4 for the specific interface""" 102c1871a33SAlexander V. Chernikov epair_ifname = self.vnet.iface_alias_map["if1"].name 103c1871a33SAlexander V. Chernikov 104c1871a33SAlexander V. Chernikov msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) 10510b94e40SAlexander V. Chernikov msg.set_request() 106c1871a33SAlexander V. Chernikov msg.base_hdr.ifa_family = family 107c1871a33SAlexander V. Chernikov msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname) 108c1871a33SAlexander V. Chernikov self.write_message(msg) 109c1871a33SAlexander V. Chernikov 110c1871a33SAlexander V. Chernikov ret = [] 111c1871a33SAlexander V. Chernikov for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): 112c1871a33SAlexander V. Chernikov assert family == rx_msg.base_hdr.ifa_family 113c1871a33SAlexander V. Chernikov assert epair_ifname == socket.if_indextoname(rx_msg.base_hdr.ifa_index) 114c1871a33SAlexander V. Chernikov ret.append(rx_msg) 115c1871a33SAlexander V. Chernikov assert len(ret) == num_items 116c1871a33SAlexander V. Chernikov return ret 117c1871a33SAlexander V. Chernikov 118c1871a33SAlexander V. Chernikov def test_4_broadcast(self): 119c1871a33SAlexander V. Chernikov """Tests header/attr output for listing IPv4 ifas on broadcast iface""" 120c1871a33SAlexander V. Chernikov ret = self.filter_iface_family(socket.AF_INET, 1) 121c1871a33SAlexander V. Chernikov # Should be 192.0.2.1/24 122c1871a33SAlexander V. Chernikov msg = ret[0] 123c1871a33SAlexander V. Chernikov # Family and ifindex has been checked already 124c1871a33SAlexander V. Chernikov assert msg.base_hdr.ifa_prefixlen == 24 125c1871a33SAlexander V. Chernikov # Ignore IFA_FLAGS for now 126c1871a33SAlexander V. Chernikov assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 127c1871a33SAlexander V. Chernikov 128d91f8db5SAlexander V. Chernikov assert msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == "192.0.2.1" 129d91f8db5SAlexander V. Chernikov assert msg.get_nla(IfaAttrType.IFA_LOCAL).addr == "192.0.2.1" 130d91f8db5SAlexander V. Chernikov assert msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == "192.0.2.255" 131c1871a33SAlexander V. Chernikov 132c1871a33SAlexander V. Chernikov epair_ifname = self.vnet.iface_alias_map["if1"].name 133d91f8db5SAlexander V. Chernikov assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname 134c1871a33SAlexander V. Chernikov 135c1871a33SAlexander V. Chernikov def test_6_broadcast(self): 136c1871a33SAlexander V. Chernikov """Tests header/attr output for listing IPv6 ifas on broadcast iface""" 137c1871a33SAlexander V. Chernikov ret = self.filter_iface_family(socket.AF_INET6, 2) 138c1871a33SAlexander V. Chernikov # Should be 192.0.2.1/24 139c1871a33SAlexander V. Chernikov if ret[0].base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value: 140c1871a33SAlexander V. Chernikov (gmsg, lmsg) = ret 141c1871a33SAlexander V. Chernikov else: 142c1871a33SAlexander V. Chernikov (lmsg, gmsg) = ret 143c1871a33SAlexander V. Chernikov # Start with global ( 2001:db8::1/64 ) 144c1871a33SAlexander V. Chernikov msg = gmsg 145c1871a33SAlexander V. Chernikov # Family and ifindex has been checked already 146c1871a33SAlexander V. Chernikov assert msg.base_hdr.ifa_prefixlen == 64 147c1871a33SAlexander V. Chernikov # Ignore IFA_FLAGS for now 148c1871a33SAlexander V. Chernikov assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 149c1871a33SAlexander V. Chernikov 150d91f8db5SAlexander V. Chernikov assert msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == "2001:db8::1" 151d91f8db5SAlexander V. Chernikov assert msg.get_nla(IfaAttrType.IFA_LOCAL) is None 152d91f8db5SAlexander V. Chernikov assert msg.get_nla(IfaAttrType.IFA_BROADCAST) is None 153c1871a33SAlexander V. Chernikov 154c1871a33SAlexander V. Chernikov epair_ifname = self.vnet.iface_alias_map["if1"].name 155d91f8db5SAlexander V. Chernikov assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname 156c1871a33SAlexander V. Chernikov 157c1871a33SAlexander V. Chernikov # Local: fe80::/64 158c1871a33SAlexander V. Chernikov msg = lmsg 159c1871a33SAlexander V. Chernikov assert msg.base_hdr.ifa_prefixlen == 64 160c1871a33SAlexander V. Chernikov # Ignore IFA_FLAGS for now 161c1871a33SAlexander V. Chernikov assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_LINK.value 162c1871a33SAlexander V. Chernikov 163d91f8db5SAlexander V. Chernikov addr = ipaddress.ip_address(msg.get_nla(IfaAttrType.IFA_ADDRESS).addr) 164c1871a33SAlexander V. Chernikov assert addr.is_link_local 165c1871a33SAlexander V. Chernikov # Verify that ifindex is not emmbedded 166c1871a33SAlexander V. Chernikov assert struct.unpack("!H", addr.packed[2:4])[0] == 0 167d91f8db5SAlexander V. Chernikov assert msg.get_nla(IfaAttrType.IFA_LOCAL) is None 168d91f8db5SAlexander V. Chernikov assert msg.get_nla(IfaAttrType.IFA_BROADCAST) is None 169c1871a33SAlexander V. Chernikov 170c1871a33SAlexander V. Chernikov epair_ifname = self.vnet.iface_alias_map["if1"].name 171d91f8db5SAlexander V. Chernikov assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname 17210b94e40SAlexander V. Chernikov 17310b94e40SAlexander V. Chernikov 17410b94e40SAlexander V. Chernikovclass RtnlIfaOps(NetlinkTestTemplate, SingleVnetTestTemplate): 17510b94e40SAlexander V. Chernikov def setup_method(self, method): 17610b94e40SAlexander V. Chernikov super().setup_method(method) 17710b94e40SAlexander V. Chernikov self.setup_netlink(NlConst.NETLINK_ROUTE) 17810b94e40SAlexander V. Chernikov 17910b94e40SAlexander V. Chernikov def send_check_success(self, msg): 18010b94e40SAlexander V. Chernikov rx_msg = self.get_reply(msg) 18110b94e40SAlexander V. Chernikov assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 18210b94e40SAlexander V. Chernikov assert rx_msg.error_code == 0 18310b94e40SAlexander V. Chernikov 18410b94e40SAlexander V. Chernikov @staticmethod 18510b94e40SAlexander V. Chernikov def get_family_from_ip(ip): 18610b94e40SAlexander V. Chernikov if ip.version == 4: 18710b94e40SAlexander V. Chernikov return socket.AF_INET 18810b94e40SAlexander V. Chernikov return socket.AF_INET6 18910b94e40SAlexander V. Chernikov 19010b94e40SAlexander V. Chernikov def create_msg(self, ifa): 19110b94e40SAlexander V. Chernikov iface = self.vnet.iface_alias_map["if1"] 19210b94e40SAlexander V. Chernikov 19310b94e40SAlexander V. Chernikov msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_NEWADDR.value) 19410b94e40SAlexander V. Chernikov msg.set_request() 19510b94e40SAlexander V. Chernikov msg.nl_hdr.nlmsg_flags |= ( 19610b94e40SAlexander V. Chernikov NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 19710b94e40SAlexander V. Chernikov ) 19810b94e40SAlexander V. Chernikov msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip) 19910b94e40SAlexander V. Chernikov msg.base_hdr.ifa_index = iface.ifindex 20010b94e40SAlexander V. Chernikov msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen 20110b94e40SAlexander V. Chernikov return msg 20210b94e40SAlexander V. Chernikov 20310b94e40SAlexander V. Chernikov def get_ifa_list(self, ifindex=0, family=0): 20410b94e40SAlexander V. Chernikov msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) 20510b94e40SAlexander V. Chernikov msg.set_request() 20610b94e40SAlexander V. Chernikov msg.base_hdr.ifa_family = family 20710b94e40SAlexander V. Chernikov msg.base_hdr.ifa_index = ifindex 20810b94e40SAlexander V. Chernikov self.write_message(msg) 20910b94e40SAlexander V. Chernikov return self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR) 21010b94e40SAlexander V. Chernikov 21110b94e40SAlexander V. Chernikov def find_msg_by_ifa(self, msg_list, ip): 21210b94e40SAlexander V. Chernikov for msg in msg_list: 21310b94e40SAlexander V. Chernikov if msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ip): 21410b94e40SAlexander V. Chernikov return msg 21510b94e40SAlexander V. Chernikov return None 21610b94e40SAlexander V. Chernikov 21710b94e40SAlexander V. Chernikov def setup_dummy_carp(self, ifindex: int, vhid: int): 21810b94e40SAlexander V. Chernikov self.require_module("carp") 21910b94e40SAlexander V. Chernikov 22010b94e40SAlexander V. Chernikov nlsock = Nlsock(NlConst.NETLINK_GENERIC, self.helper) 22110b94e40SAlexander V. Chernikov family_id = nlsock.get_genl_family_id("carp") 22210b94e40SAlexander V. Chernikov 22310b94e40SAlexander V. Chernikov msg = CarpGenMessage(self.helper, family_id, CarpMsgType.CARP_NL_CMD_SET) 22410b94e40SAlexander V. Chernikov msg.set_request() 22510b94e40SAlexander V. Chernikov msg.add_nla(NlAttrU32(CarpAttrType.CARP_NL_VHID, vhid)) 22610b94e40SAlexander V. Chernikov msg.add_nla(NlAttrU32(CarpAttrType.CARP_NL_IFINDEX, ifindex)) 22710b94e40SAlexander V. Chernikov rx_msg = nlsock.get_reply(msg) 22810b94e40SAlexander V. Chernikov 22910b94e40SAlexander V. Chernikov assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 23010b94e40SAlexander V. Chernikov assert rx_msg.error_code == 0 23110b94e40SAlexander V. Chernikov 23210b94e40SAlexander V. Chernikov 23310b94e40SAlexander V. Chernikovclass TestRtNlIfaddrOpsBroadcast(RtnlIfaOps): 23410b94e40SAlexander V. Chernikov def test_add_4(self): 23510b94e40SAlexander V. Chernikov """Tests IPv4 address addition to the standard broadcast interface""" 23610b94e40SAlexander V. Chernikov ifa = ipaddress.ip_interface("192.0.2.1/24") 23710b94e40SAlexander V. Chernikov ifa_brd = ifa.network.broadcast_address 23810b94e40SAlexander V. Chernikov iface = self.vnet.iface_alias_map["if1"] 23910b94e40SAlexander V. Chernikov 24010b94e40SAlexander V. Chernikov msg = self.create_msg(ifa) 24110b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 24210b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd))) 24310b94e40SAlexander V. Chernikov 24410b94e40SAlexander V. Chernikov self.send_check_success(msg) 24510b94e40SAlexander V. Chernikov 24610b94e40SAlexander V. Chernikov lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 24710b94e40SAlexander V. Chernikov assert len(lst) == 1 24810b94e40SAlexander V. Chernikov rx_msg = lst[0] 24910b94e40SAlexander V. Chernikov 25010b94e40SAlexander V. Chernikov assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen 25110b94e40SAlexander V. Chernikov assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 25210b94e40SAlexander V. Chernikov 25310b94e40SAlexander V. Chernikov assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) 25410b94e40SAlexander V. Chernikov assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip) 25510b94e40SAlexander V. Chernikov assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd) 25610b94e40SAlexander V. Chernikov 2577eee0eafSAlexander V. Chernikov @pytest.mark.parametrize( 2587eee0eafSAlexander V. Chernikov "brd", 2597eee0eafSAlexander V. Chernikov [ 2607eee0eafSAlexander V. Chernikov pytest.param((32, True, "192.0.2.1"), id="auto_32"), 2617eee0eafSAlexander V. Chernikov pytest.param((31, True, "255.255.255.255"), id="auto_31"), 2627eee0eafSAlexander V. Chernikov pytest.param((30, True, "192.0.2.3"), id="auto_30"), 2637eee0eafSAlexander V. Chernikov pytest.param((30, False, "192.0.2.2"), id="custom_30"), 2647eee0eafSAlexander V. Chernikov pytest.param((24, False, "192.0.2.7"), id="custom_24"), 2657eee0eafSAlexander V. Chernikov ], 2667eee0eafSAlexander V. Chernikov ) 2677eee0eafSAlexander V. Chernikov def test_add_4_brd(self, brd): 2687eee0eafSAlexander V. Chernikov """Tests proper broadcast setup when adding IPv4 ifa""" 2697eee0eafSAlexander V. Chernikov plen, auto_brd, ifa_brd_str = brd 2707eee0eafSAlexander V. Chernikov ifa = ipaddress.ip_interface("192.0.2.1/{}".format(plen)) 2717eee0eafSAlexander V. Chernikov iface = self.vnet.iface_alias_map["if1"] 2727eee0eafSAlexander V. Chernikov ifa_brd = ipaddress.ip_address(ifa_brd_str) 2737eee0eafSAlexander V. Chernikov 2747eee0eafSAlexander V. Chernikov msg = self.create_msg(ifa) 2757eee0eafSAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 2767eee0eafSAlexander V. Chernikov if not auto_brd: 2777eee0eafSAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd))) 2787eee0eafSAlexander V. Chernikov 2797eee0eafSAlexander V. Chernikov self.send_check_success(msg) 2807eee0eafSAlexander V. Chernikov 2817eee0eafSAlexander V. Chernikov lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 2827eee0eafSAlexander V. Chernikov assert len(lst) == 1 2837eee0eafSAlexander V. Chernikov rx_msg = lst[0] 2847eee0eafSAlexander V. Chernikov 2857eee0eafSAlexander V. Chernikov assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen 2867eee0eafSAlexander V. Chernikov assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 2877eee0eafSAlexander V. Chernikov 2887eee0eafSAlexander V. Chernikov assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) 2897eee0eafSAlexander V. Chernikov assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip) 2907eee0eafSAlexander V. Chernikov assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd) 2917eee0eafSAlexander V. Chernikov 29210b94e40SAlexander V. Chernikov def test_add_6(self): 29310b94e40SAlexander V. Chernikov ifa = ipaddress.ip_interface("2001:db8::1/64") 29410b94e40SAlexander V. Chernikov iface = self.vnet.iface_alias_map["if1"] 29510b94e40SAlexander V. Chernikov 29610b94e40SAlexander V. Chernikov msg = self.create_msg(ifa) 29710b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 29810b94e40SAlexander V. Chernikov 29910b94e40SAlexander V. Chernikov self.send_check_success(msg) 30010b94e40SAlexander V. Chernikov 30110b94e40SAlexander V. Chernikov lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 30210b94e40SAlexander V. Chernikov assert len(lst) == 2 30310b94e40SAlexander V. Chernikov rx_msg_gu = self.find_msg_by_ifa(lst, ifa.ip) 30410b94e40SAlexander V. Chernikov assert rx_msg_gu is not None 30510b94e40SAlexander V. Chernikov 30610b94e40SAlexander V. Chernikov assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen 30710b94e40SAlexander V. Chernikov assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 30810b94e40SAlexander V. Chernikov assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) 30910b94e40SAlexander V. Chernikov 31010b94e40SAlexander V. Chernikov def test_add_4_carp(self): 31110b94e40SAlexander V. Chernikov ifa = ipaddress.ip_interface("192.0.2.1/24") 31210b94e40SAlexander V. Chernikov ifa_brd = ifa.network.broadcast_address 31310b94e40SAlexander V. Chernikov iface = self.vnet.iface_alias_map["if1"] 31410b94e40SAlexander V. Chernikov vhid = 77 31510b94e40SAlexander V. Chernikov 31610b94e40SAlexander V. Chernikov self.setup_dummy_carp(iface.ifindex, vhid) 31710b94e40SAlexander V. Chernikov 31810b94e40SAlexander V. Chernikov msg = self.create_msg(ifa) 31910b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 32010b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd))) 32110b94e40SAlexander V. Chernikov attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_VHID, vhid)] 32210b94e40SAlexander V. Chernikov msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd)) 32310b94e40SAlexander V. Chernikov 32410b94e40SAlexander V. Chernikov self.send_check_success(msg) 32510b94e40SAlexander V. Chernikov 32610b94e40SAlexander V. Chernikov lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 32710b94e40SAlexander V. Chernikov assert len(lst) == 1 32810b94e40SAlexander V. Chernikov rx_msg = lst[0] 32910b94e40SAlexander V. Chernikov 33010b94e40SAlexander V. Chernikov assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen 33110b94e40SAlexander V. Chernikov assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 33210b94e40SAlexander V. Chernikov 33310b94e40SAlexander V. Chernikov assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) 33410b94e40SAlexander V. Chernikov assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip) 33510b94e40SAlexander V. Chernikov assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd) 33610b94e40SAlexander V. Chernikov ifa_bsd = rx_msg.get_nla(IfaAttrType.IFA_FREEBSD) 33710b94e40SAlexander V. Chernikov assert ifa_bsd.get_nla(IfafAttrType.IFAF_VHID).u32 == vhid 33810b94e40SAlexander V. Chernikov 33910b94e40SAlexander V. Chernikov def test_add_6_carp(self): 34010b94e40SAlexander V. Chernikov ifa = ipaddress.ip_interface("2001:db8::1/64") 34110b94e40SAlexander V. Chernikov iface = self.vnet.iface_alias_map["if1"] 34210b94e40SAlexander V. Chernikov vhid = 77 34310b94e40SAlexander V. Chernikov 34410b94e40SAlexander V. Chernikov self.setup_dummy_carp(iface.ifindex, vhid) 34510b94e40SAlexander V. Chernikov 34610b94e40SAlexander V. Chernikov msg = self.create_msg(ifa) 34710b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 34810b94e40SAlexander V. Chernikov attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_VHID, vhid)] 34910b94e40SAlexander V. Chernikov msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd)) 35010b94e40SAlexander V. Chernikov 35110b94e40SAlexander V. Chernikov self.send_check_success(msg) 35210b94e40SAlexander V. Chernikov 35310b94e40SAlexander V. Chernikov lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 35410b94e40SAlexander V. Chernikov assert len(lst) == 2 35510b94e40SAlexander V. Chernikov rx_msg_gu = self.find_msg_by_ifa(lst, ifa.ip) 35610b94e40SAlexander V. Chernikov assert rx_msg_gu is not None 35710b94e40SAlexander V. Chernikov 35810b94e40SAlexander V. Chernikov assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen 35910b94e40SAlexander V. Chernikov assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 36010b94e40SAlexander V. Chernikov assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) 36110b94e40SAlexander V. Chernikov ifa_bsd = rx_msg_gu.get_nla(IfaAttrType.IFA_FREEBSD) 36210b94e40SAlexander V. Chernikov assert ifa_bsd.get_nla(IfafAttrType.IFAF_VHID).u32 == vhid 36310b94e40SAlexander V. Chernikov 36410b94e40SAlexander V. Chernikov def test_add_6_lifetime(self): 36510b94e40SAlexander V. Chernikov ifa = ipaddress.ip_interface("2001:db8::1/64") 36610b94e40SAlexander V. Chernikov iface = self.vnet.iface_alias_map["if1"] 36710b94e40SAlexander V. Chernikov pref_time = 43200 36810b94e40SAlexander V. Chernikov valid_time = 86400 36910b94e40SAlexander V. Chernikov 37010b94e40SAlexander V. Chernikov ci = IfaCacheInfo(ifa_prefered=pref_time, ifa_valid=valid_time) 37110b94e40SAlexander V. Chernikov 37210b94e40SAlexander V. Chernikov msg = self.create_msg(ifa) 37310b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 37410b94e40SAlexander V. Chernikov msg.add_nla(NlAttr(IfaAttrType.IFA_CACHEINFO, bytes(ci))) 37510b94e40SAlexander V. Chernikov 37610b94e40SAlexander V. Chernikov self.send_check_success(msg) 37710b94e40SAlexander V. Chernikov 37810b94e40SAlexander V. Chernikov lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 37910b94e40SAlexander V. Chernikov assert len(lst) == 2 38010b94e40SAlexander V. Chernikov rx_msg = self.find_msg_by_ifa(lst, ifa.ip) 38110b94e40SAlexander V. Chernikov assert rx_msg is not None 38210b94e40SAlexander V. Chernikov 38310b94e40SAlexander V. Chernikov ci = rx_msg.get_nla(IfaAttrType.IFA_CACHEINFO).ci 38410b94e40SAlexander V. Chernikov assert pref_time - 5 <= ci.ifa_prefered <= pref_time 38510b94e40SAlexander V. Chernikov assert valid_time - 5 <= ci.ifa_valid <= valid_time 38610b94e40SAlexander V. Chernikov assert ci.cstamp > 0 38710b94e40SAlexander V. Chernikov assert ci.tstamp > 0 38810b94e40SAlexander V. Chernikov assert ci.tstamp >= ci.cstamp 38910b94e40SAlexander V. Chernikov 39010b94e40SAlexander V. Chernikov @pytest.mark.parametrize( 39110b94e40SAlexander V. Chernikov "flags_str", 39210b94e40SAlexander V. Chernikov [ 39310b94e40SAlexander V. Chernikov "autoconf", 39410b94e40SAlexander V. Chernikov "deprecated", 39510b94e40SAlexander V. Chernikov "autoconf,deprecated", 39610b94e40SAlexander V. Chernikov "prefer_source", 39710b94e40SAlexander V. Chernikov ], 39810b94e40SAlexander V. Chernikov ) 39910b94e40SAlexander V. Chernikov def test_add_6_flags(self, flags_str): 40010b94e40SAlexander V. Chernikov ifa = ipaddress.ip_interface("2001:db8::1/64") 40110b94e40SAlexander V. Chernikov iface = self.vnet.iface_alias_map["if1"] 40210b94e40SAlexander V. Chernikov 40310b94e40SAlexander V. Chernikov flags_map = { 40410b94e40SAlexander V. Chernikov "autoconf": {"nl": 0, "f": IfafFlags6.IN6_IFF_AUTOCONF}, 40510b94e40SAlexander V. Chernikov "deprecated": { 40610b94e40SAlexander V. Chernikov "nl": IfaFlags.IFA_F_DEPRECATED, 40710b94e40SAlexander V. Chernikov "f": IfafFlags6.IN6_IFF_DEPRECATED, 40810b94e40SAlexander V. Chernikov }, 40910b94e40SAlexander V. Chernikov "prefer_source": {"nl": 0, "f": IfafFlags6.IN6_IFF_PREFER_SOURCE}, 41010b94e40SAlexander V. Chernikov } 41110b94e40SAlexander V. Chernikov nl_flags = 0 41210b94e40SAlexander V. Chernikov f_flags = 0 41310b94e40SAlexander V. Chernikov 41410b94e40SAlexander V. Chernikov for flag_str in flags_str.split(","): 41510b94e40SAlexander V. Chernikov d = flags_map.get(flag_str, {}) 41610b94e40SAlexander V. Chernikov nl_flags |= enum_or_int(d.get("nl", 0)) 41710b94e40SAlexander V. Chernikov f_flags |= enum_or_int(d.get("f", 0)) 41810b94e40SAlexander V. Chernikov 41910b94e40SAlexander V. Chernikov msg = self.create_msg(ifa) 42010b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 42110b94e40SAlexander V. Chernikov msg.add_nla(NlAttrU32(IfaAttrType.IFA_FLAGS, nl_flags)) 42210b94e40SAlexander V. Chernikov attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_FLAGS, f_flags)] 42310b94e40SAlexander V. Chernikov msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd)) 42410b94e40SAlexander V. Chernikov 42510b94e40SAlexander V. Chernikov self.send_check_success(msg) 42610b94e40SAlexander V. Chernikov 42710b94e40SAlexander V. Chernikov lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 42810b94e40SAlexander V. Chernikov assert len(lst) == 2 42910b94e40SAlexander V. Chernikov rx_msg = self.find_msg_by_ifa(lst, ifa.ip) 43010b94e40SAlexander V. Chernikov assert rx_msg is not None 43110b94e40SAlexander V. Chernikov 43210b94e40SAlexander V. Chernikov assert rx_msg.get_nla(IfaAttrType.IFA_FLAGS).u32 == nl_flags 43310b94e40SAlexander V. Chernikov ifa_bsd = rx_msg.get_nla(IfaAttrType.IFA_FREEBSD) 43410b94e40SAlexander V. Chernikov assert ifa_bsd.get_nla(IfafAttrType.IFAF_FLAGS).u32 == f_flags 43510b94e40SAlexander V. Chernikov 43610b94e40SAlexander V. Chernikov def test_add_4_empty_message(self): 43710b94e40SAlexander V. Chernikov """Tests correct failure w/ empty message""" 43810b94e40SAlexander V. Chernikov msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_NEWADDR.value) 43910b94e40SAlexander V. Chernikov msg.set_request() 44010b94e40SAlexander V. Chernikov msg.nl_hdr.nlmsg_flags |= ( 44110b94e40SAlexander V. Chernikov NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value 44210b94e40SAlexander V. Chernikov ) 44310b94e40SAlexander V. Chernikov 44410b94e40SAlexander V. Chernikov rx_msg = self.get_reply(msg) 44510b94e40SAlexander V. Chernikov assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 44610b94e40SAlexander V. Chernikov assert rx_msg.error_code != 0 44710b94e40SAlexander V. Chernikov 44810b94e40SAlexander V. Chernikov def test_add_4_empty_ifindex(self): 44910b94e40SAlexander V. Chernikov """Tests correct failure w/ empty ifindex""" 45010b94e40SAlexander V. Chernikov ifa = ipaddress.ip_interface("192.0.2.1/24") 45110b94e40SAlexander V. Chernikov ifa_brd = ifa.network.broadcast_address 45210b94e40SAlexander V. Chernikov 45310b94e40SAlexander V. Chernikov msg = self.create_msg(ifa) 45410b94e40SAlexander V. Chernikov msg.base_hdr.ifa_index = 0 45510b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 45610b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd))) 45710b94e40SAlexander V. Chernikov 45810b94e40SAlexander V. Chernikov rx_msg = self.get_reply(msg) 45910b94e40SAlexander V. Chernikov assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 46010b94e40SAlexander V. Chernikov assert rx_msg.error_code != 0 46110b94e40SAlexander V. Chernikov 46210b94e40SAlexander V. Chernikov def test_add_4_empty_addr(self): 46310b94e40SAlexander V. Chernikov """Tests correct failure w/ empty address""" 46410b94e40SAlexander V. Chernikov ifa = ipaddress.ip_interface("192.0.2.1/24") 46510b94e40SAlexander V. Chernikov ifa_brd = ifa.network.broadcast_address 46610b94e40SAlexander V. Chernikov 46710b94e40SAlexander V. Chernikov msg = self.create_msg(ifa) 46810b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd))) 46910b94e40SAlexander V. Chernikov 47010b94e40SAlexander V. Chernikov rx_msg = self.get_reply(msg) 47110b94e40SAlexander V. Chernikov assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 47210b94e40SAlexander V. Chernikov assert rx_msg.error_code != 0 47310b94e40SAlexander V. Chernikov 47410b94e40SAlexander V. Chernikov @pytest.mark.parametrize( 47510b94e40SAlexander V. Chernikov "ifa_str", 47610b94e40SAlexander V. Chernikov [ 47710b94e40SAlexander V. Chernikov pytest.param("192.0.2.1/32", id="ipv4_host"), 47810b94e40SAlexander V. Chernikov pytest.param("192.0.2.1/24", id="ipv4_prefix"), 47910b94e40SAlexander V. Chernikov pytest.param("2001:db8::1/64", id="ipv6_gu_prefix"), 48010b94e40SAlexander V. Chernikov pytest.param("2001:db8::1/128", id="ipv6_gu_host"), 48110b94e40SAlexander V. Chernikov ], 48210b94e40SAlexander V. Chernikov ) 48310b94e40SAlexander V. Chernikov @pytest.mark.parametrize( 48410b94e40SAlexander V. Chernikov "tlv", 48510b94e40SAlexander V. Chernikov [ 48610b94e40SAlexander V. Chernikov pytest.param("local", id="ifa_local"), 48710b94e40SAlexander V. Chernikov pytest.param("address", id="ifa_address"), 48810b94e40SAlexander V. Chernikov ], 48910b94e40SAlexander V. Chernikov ) 49010b94e40SAlexander V. Chernikov def test_del(self, tlv, ifa_str): 49110b94e40SAlexander V. Chernikov """Tests address deletion from the standard broadcast interface""" 49210b94e40SAlexander V. Chernikov ifa = ipaddress.ip_interface(ifa_str) 49310b94e40SAlexander V. Chernikov ifa_brd = ifa.network.broadcast_address 49410b94e40SAlexander V. Chernikov iface = self.vnet.iface_alias_map["if1"] 49510b94e40SAlexander V. Chernikov 49610b94e40SAlexander V. Chernikov msg = self.create_msg(ifa) 49710b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 49810b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd))) 49910b94e40SAlexander V. Chernikov 50010b94e40SAlexander V. Chernikov self.send_check_success(msg) 50110b94e40SAlexander V. Chernikov lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 50210b94e40SAlexander V. Chernikov rx_msg = self.find_msg_by_ifa(lst, ifa.ip) 50310b94e40SAlexander V. Chernikov assert rx_msg is not None 50410b94e40SAlexander V. Chernikov 50510b94e40SAlexander V. Chernikov msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value) 50610b94e40SAlexander V. Chernikov msg.set_request() 50710b94e40SAlexander V. Chernikov msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip) 50810b94e40SAlexander V. Chernikov msg.base_hdr.ifa_index = iface.ifindex 50910b94e40SAlexander V. Chernikov msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen 51010b94e40SAlexander V. Chernikov 51110b94e40SAlexander V. Chernikov if tlv == "local": 51210b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 51310b94e40SAlexander V. Chernikov if tlv == "address": 51410b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip))) 51510b94e40SAlexander V. Chernikov 51610b94e40SAlexander V. Chernikov self.send_check_success(msg) 51710b94e40SAlexander V. Chernikov lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 51810b94e40SAlexander V. Chernikov rx_msg = self.find_msg_by_ifa(lst, ifa.ip) 51910b94e40SAlexander V. Chernikov assert rx_msg is None 52010b94e40SAlexander V. Chernikov 52110b94e40SAlexander V. Chernikov 52210b94e40SAlexander V. Chernikovclass TestRtNlIfaddrOpsP2p(RtnlIfaOps): 52310b94e40SAlexander V. Chernikov IFTYPE = "gif" 52410b94e40SAlexander V. Chernikov 52510b94e40SAlexander V. Chernikov @pytest.mark.parametrize( 52610b94e40SAlexander V. Chernikov "ifa_pair", 52710b94e40SAlexander V. Chernikov [ 52810b94e40SAlexander V. Chernikov pytest.param(["192.0.2.1/24", "192.0.2.2"], id="dst_inside_24"), 52910b94e40SAlexander V. Chernikov pytest.param(["192.0.2.1/30", "192.0.2.2"], id="dst_inside_30"), 53010b94e40SAlexander V. Chernikov pytest.param(["192.0.2.1/31", "192.0.2.2"], id="dst_inside_31"), 53110b94e40SAlexander V. Chernikov pytest.param(["192.0.2.1/32", "192.0.2.2"], id="dst_outside_32"), 53210b94e40SAlexander V. Chernikov pytest.param(["192.0.2.1/30", "192.0.2.100"], id="dst_outside_30"), 53310b94e40SAlexander V. Chernikov ], 53410b94e40SAlexander V. Chernikov ) 53510b94e40SAlexander V. Chernikov def test_add_4(self, ifa_pair): 53610b94e40SAlexander V. Chernikov """Tests IPv4 address addition to the p2p interface""" 53710b94e40SAlexander V. Chernikov ifa = ipaddress.ip_interface(ifa_pair[0]) 53810b94e40SAlexander V. Chernikov peer_ip = ipaddress.ip_address(ifa_pair[1]) 53910b94e40SAlexander V. Chernikov iface = self.vnet.iface_alias_map["if1"] 54010b94e40SAlexander V. Chernikov 54110b94e40SAlexander V. Chernikov msg = self.create_msg(ifa) 54210b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 54310b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip))) 54410b94e40SAlexander V. Chernikov 54510b94e40SAlexander V. Chernikov self.send_check_success(msg) 54610b94e40SAlexander V. Chernikov 54710b94e40SAlexander V. Chernikov lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 54810b94e40SAlexander V. Chernikov assert len(lst) == 1 54910b94e40SAlexander V. Chernikov rx_msg = lst[0] 55010b94e40SAlexander V. Chernikov 55110b94e40SAlexander V. Chernikov assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen 55210b94e40SAlexander V. Chernikov assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 55310b94e40SAlexander V. Chernikov 55410b94e40SAlexander V. Chernikov assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip) 55510b94e40SAlexander V. Chernikov assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(peer_ip) 55610b94e40SAlexander V. Chernikov 55710b94e40SAlexander V. Chernikov @pytest.mark.parametrize( 55810b94e40SAlexander V. Chernikov "ifa_pair", 55910b94e40SAlexander V. Chernikov [ 56010b94e40SAlexander V. Chernikov pytest.param( 56110b94e40SAlexander V. Chernikov ["2001:db8::1/64", "2001:db8::2"], 56210b94e40SAlexander V. Chernikov id="dst_inside_64", 56310b94e40SAlexander V. Chernikov marks=pytest.mark.xfail(reason="currently fails"), 56410b94e40SAlexander V. Chernikov ), 56510b94e40SAlexander V. Chernikov pytest.param( 56610b94e40SAlexander V. Chernikov ["2001:db8::1/127", "2001:db8::2"], 56710b94e40SAlexander V. Chernikov id="dst_inside_127", 56810b94e40SAlexander V. Chernikov marks=pytest.mark.xfail(reason="currently fails"), 56910b94e40SAlexander V. Chernikov ), 57010b94e40SAlexander V. Chernikov pytest.param(["2001:db8::1/128", "2001:db8::2"], id="dst_outside_128"), 57110b94e40SAlexander V. Chernikov pytest.param( 57210b94e40SAlexander V. Chernikov ["2001:db8::1/64", "2001:db8:2::2"], 57310b94e40SAlexander V. Chernikov id="dst_outside_64", 57410b94e40SAlexander V. Chernikov marks=pytest.mark.xfail(reason="currently fails"), 57510b94e40SAlexander V. Chernikov ), 57610b94e40SAlexander V. Chernikov ], 57710b94e40SAlexander V. Chernikov ) 57810b94e40SAlexander V. Chernikov def test_add_6(self, ifa_pair): 57910b94e40SAlexander V. Chernikov """Tests IPv6 address addition to the p2p interface""" 58010b94e40SAlexander V. Chernikov ifa = ipaddress.ip_interface(ifa_pair[0]) 58110b94e40SAlexander V. Chernikov peer_ip = ipaddress.ip_address(ifa_pair[1]) 58210b94e40SAlexander V. Chernikov iface = self.vnet.iface_alias_map["if1"] 58310b94e40SAlexander V. Chernikov 58410b94e40SAlexander V. Chernikov msg = self.create_msg(ifa) 58510b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 58610b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip))) 58710b94e40SAlexander V. Chernikov 58810b94e40SAlexander V. Chernikov self.send_check_success(msg) 58910b94e40SAlexander V. Chernikov 59010b94e40SAlexander V. Chernikov lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 59110b94e40SAlexander V. Chernikov assert len(lst) == 2 59210b94e40SAlexander V. Chernikov rx_msg_gu = self.find_msg_by_ifa(lst, peer_ip) 59310b94e40SAlexander V. Chernikov assert rx_msg_gu is not None 59410b94e40SAlexander V. Chernikov 59510b94e40SAlexander V. Chernikov assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen 59610b94e40SAlexander V. Chernikov assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 59710b94e40SAlexander V. Chernikov assert rx_msg_gu.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip) 59810b94e40SAlexander V. Chernikov assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(peer_ip) 59910b94e40SAlexander V. Chernikov 60010b94e40SAlexander V. Chernikov @pytest.mark.parametrize( 60110b94e40SAlexander V. Chernikov "ifa_pair", 60210b94e40SAlexander V. Chernikov [ 60310b94e40SAlexander V. Chernikov pytest.param(["192.0.2.1/30", "192.0.2.2"], id="ipv4_dst_inside_30"), 60410b94e40SAlexander V. Chernikov pytest.param(["192.0.2.1/32", "192.0.2.2"], id="ipv4_dst_outside_32"), 60510b94e40SAlexander V. Chernikov pytest.param(["2001:db8::1/128", "2001:db8::2"], id="ip6_dst_outside_128"), 60610b94e40SAlexander V. Chernikov ], 60710b94e40SAlexander V. Chernikov ) 60810b94e40SAlexander V. Chernikov @pytest.mark.parametrize( 60910b94e40SAlexander V. Chernikov "tlv_pair", 61010b94e40SAlexander V. Chernikov [ 61110b94e40SAlexander V. Chernikov pytest.param(["a", ""], id="ifa_addr=addr"), 61210b94e40SAlexander V. Chernikov pytest.param(["", "a"], id="ifa_local=addr"), 61310b94e40SAlexander V. Chernikov pytest.param(["a", "a"], id="ifa_addr=addr,ifa_local=addr"), 61410b94e40SAlexander V. Chernikov ], 61510b94e40SAlexander V. Chernikov ) 61610b94e40SAlexander V. Chernikov def test_del(self, tlv_pair, ifa_pair): 61710b94e40SAlexander V. Chernikov """Tests address deletion from the P2P interface""" 61810b94e40SAlexander V. Chernikov ifa = ipaddress.ip_interface(ifa_pair[0]) 61910b94e40SAlexander V. Chernikov peer_ip = ipaddress.ip_address(ifa_pair[1]) 62010b94e40SAlexander V. Chernikov iface = self.vnet.iface_alias_map["if1"] 62110b94e40SAlexander V. Chernikov ifa_addr_str, ifa_local_str = tlv_pair 62210b94e40SAlexander V. Chernikov 62310b94e40SAlexander V. Chernikov msg = self.create_msg(ifa) 62410b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 62510b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip))) 62610b94e40SAlexander V. Chernikov 62710b94e40SAlexander V. Chernikov self.send_check_success(msg) 62810b94e40SAlexander V. Chernikov lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 62910b94e40SAlexander V. Chernikov rx_msg = self.find_msg_by_ifa(lst, peer_ip) 63010b94e40SAlexander V. Chernikov assert rx_msg is not None 63110b94e40SAlexander V. Chernikov 63210b94e40SAlexander V. Chernikov msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value) 63310b94e40SAlexander V. Chernikov msg.set_request() 63410b94e40SAlexander V. Chernikov msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip) 63510b94e40SAlexander V. Chernikov msg.base_hdr.ifa_index = iface.ifindex 63610b94e40SAlexander V. Chernikov msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen 63710b94e40SAlexander V. Chernikov 63810b94e40SAlexander V. Chernikov if "a" in ifa_addr_str: 63910b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip))) 64010b94e40SAlexander V. Chernikov if "p" in ifa_addr_str: 64110b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip))) 64210b94e40SAlexander V. Chernikov if "a" in ifa_local_str: 64310b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 64410b94e40SAlexander V. Chernikov if "p" in ifa_local_str: 64510b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(peer_ip))) 64610b94e40SAlexander V. Chernikov 64710b94e40SAlexander V. Chernikov self.send_check_success(msg) 64810b94e40SAlexander V. Chernikov lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 64910b94e40SAlexander V. Chernikov rx_msg = self.find_msg_by_ifa(lst, ifa.ip) 65010b94e40SAlexander V. Chernikov assert rx_msg is None 65110b94e40SAlexander V. Chernikov 65210b94e40SAlexander V. Chernikov 65310b94e40SAlexander V. Chernikovclass TestRtNlAddIfaddrLo(RtnlIfaOps): 65410b94e40SAlexander V. Chernikov IFTYPE = "lo" 65510b94e40SAlexander V. Chernikov 65610b94e40SAlexander V. Chernikov @pytest.mark.parametrize( 65710b94e40SAlexander V. Chernikov "ifa_str", 65810b94e40SAlexander V. Chernikov [ 65910b94e40SAlexander V. Chernikov pytest.param("192.0.2.1/24", id="prefix"), 66010b94e40SAlexander V. Chernikov pytest.param("192.0.2.1/32", id="host"), 66110b94e40SAlexander V. Chernikov ], 66210b94e40SAlexander V. Chernikov ) 66310b94e40SAlexander V. Chernikov def test_add_4(self, ifa_str): 66410b94e40SAlexander V. Chernikov """Tests IPv4 address addition to the loopback interface""" 66510b94e40SAlexander V. Chernikov ifa = ipaddress.ip_interface(ifa_str) 66610b94e40SAlexander V. Chernikov iface = self.vnet.iface_alias_map["if1"] 66710b94e40SAlexander V. Chernikov 66810b94e40SAlexander V. Chernikov msg = self.create_msg(ifa) 66910b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 67010b94e40SAlexander V. Chernikov 67110b94e40SAlexander V. Chernikov self.send_check_success(msg) 67210b94e40SAlexander V. Chernikov 67310b94e40SAlexander V. Chernikov lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 67410b94e40SAlexander V. Chernikov assert len(lst) == 1 67510b94e40SAlexander V. Chernikov rx_msg = lst[0] 67610b94e40SAlexander V. Chernikov 67710b94e40SAlexander V. Chernikov assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen 67810b94e40SAlexander V. Chernikov assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 67910b94e40SAlexander V. Chernikov 68010b94e40SAlexander V. Chernikov assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) 68110b94e40SAlexander V. Chernikov assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip) 68210b94e40SAlexander V. Chernikov 68310b94e40SAlexander V. Chernikov @pytest.mark.parametrize( 68410b94e40SAlexander V. Chernikov "ifa_str", 68510b94e40SAlexander V. Chernikov [ 68610b94e40SAlexander V. Chernikov pytest.param("2001:db8::1/64", id="gu_prefix"), 68710b94e40SAlexander V. Chernikov pytest.param("2001:db8::1/128", id="gu_host"), 68810b94e40SAlexander V. Chernikov ], 68910b94e40SAlexander V. Chernikov ) 69010b94e40SAlexander V. Chernikov def test_add_6(self, ifa_str): 69110b94e40SAlexander V. Chernikov """Tests IPv6 address addition to the loopback interface""" 69210b94e40SAlexander V. Chernikov ifa = ipaddress.ip_interface(ifa_str) 69310b94e40SAlexander V. Chernikov iface = self.vnet.iface_alias_map["if1"] 69410b94e40SAlexander V. Chernikov 69510b94e40SAlexander V. Chernikov msg = self.create_msg(ifa) 69610b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 69710b94e40SAlexander V. Chernikov 69810b94e40SAlexander V. Chernikov self.send_check_success(msg) 69910b94e40SAlexander V. Chernikov 70010b94e40SAlexander V. Chernikov lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 701*9247238cSAlexander V. Chernikov assert len(lst) == 2 # link-local should be auto-created as well 70210b94e40SAlexander V. Chernikov rx_msg = self.find_msg_by_ifa(lst, ifa.ip) 70310b94e40SAlexander V. Chernikov assert rx_msg is not None 70410b94e40SAlexander V. Chernikov 70510b94e40SAlexander V. Chernikov assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen 70610b94e40SAlexander V. Chernikov assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 70710b94e40SAlexander V. Chernikov assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) 70810b94e40SAlexander V. Chernikov 70910b94e40SAlexander V. Chernikov @pytest.mark.parametrize( 71010b94e40SAlexander V. Chernikov "ifa_str", 71110b94e40SAlexander V. Chernikov [ 71210b94e40SAlexander V. Chernikov pytest.param("192.0.2.1/32", id="ipv4_host"), 71310b94e40SAlexander V. Chernikov pytest.param("192.0.2.1/24", id="ipv4_prefix"), 71410b94e40SAlexander V. Chernikov pytest.param("2001:db8::1/64", id="ipv6_gu_prefix"), 71510b94e40SAlexander V. Chernikov pytest.param("2001:db8::1/128", id="ipv6_gu_host"), 71610b94e40SAlexander V. Chernikov ], 71710b94e40SAlexander V. Chernikov ) 71810b94e40SAlexander V. Chernikov @pytest.mark.parametrize( 71910b94e40SAlexander V. Chernikov "tlv", 72010b94e40SAlexander V. Chernikov [ 72110b94e40SAlexander V. Chernikov pytest.param("local", id="ifa_local"), 72210b94e40SAlexander V. Chernikov pytest.param("address", id="ifa_address"), 72310b94e40SAlexander V. Chernikov ], 72410b94e40SAlexander V. Chernikov ) 72510b94e40SAlexander V. Chernikov def test_del(self, tlv, ifa_str): 72610b94e40SAlexander V. Chernikov """Tests address deletion from the loopback interface""" 72710b94e40SAlexander V. Chernikov ifa = ipaddress.ip_interface(ifa_str) 72810b94e40SAlexander V. Chernikov iface = self.vnet.iface_alias_map["if1"] 72910b94e40SAlexander V. Chernikov 73010b94e40SAlexander V. Chernikov msg = self.create_msg(ifa) 73110b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 73210b94e40SAlexander V. Chernikov 73310b94e40SAlexander V. Chernikov self.send_check_success(msg) 73410b94e40SAlexander V. Chernikov lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 73510b94e40SAlexander V. Chernikov rx_msg = self.find_msg_by_ifa(lst, ifa.ip) 73610b94e40SAlexander V. Chernikov assert rx_msg is not None 73710b94e40SAlexander V. Chernikov 73810b94e40SAlexander V. Chernikov msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value) 73910b94e40SAlexander V. Chernikov msg.set_request() 74010b94e40SAlexander V. Chernikov msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip) 74110b94e40SAlexander V. Chernikov msg.base_hdr.ifa_index = iface.ifindex 74210b94e40SAlexander V. Chernikov msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen 74310b94e40SAlexander V. Chernikov 74410b94e40SAlexander V. Chernikov if tlv == "local": 74510b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) 74610b94e40SAlexander V. Chernikov if tlv == "address": 74710b94e40SAlexander V. Chernikov msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip))) 74810b94e40SAlexander V. Chernikov 74910b94e40SAlexander V. Chernikov self.send_check_success(msg) 75010b94e40SAlexander V. Chernikov lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) 75110b94e40SAlexander V. Chernikov rx_msg = self.find_msg_by_ifa(lst, ifa.ip) 75210b94e40SAlexander V. Chernikov assert rx_msg is None 753