1import ipaddress 2import socket 3import struct 4 5from atf_python.sys.net.vnet import SingleVnetTestTemplate 6from atf_python.sys.netlink.base_headers import NlmBaseFlags 7from atf_python.sys.netlink.base_headers import Nlmsghdr 8from atf_python.sys.netlink.netlink import NetlinkTestTemplate 9from atf_python.sys.netlink.netlink_route import IfattrType 10from atf_python.sys.netlink.netlink_route import NetlinkIfaMessage 11from atf_python.sys.netlink.netlink_route import NlRtMsgType 12from atf_python.sys.netlink.netlink_route import RtScope 13from atf_python.sys.netlink.utils import NlConst 14 15 16class TestRtNlIfaddr(NetlinkTestTemplate, SingleVnetTestTemplate): 17 def setup_method(self, method): 18 method_name = method.__name__ 19 if "4" in method_name: 20 self.IPV4_PREFIXES = ["192.0.2.1/24"] 21 if "6" in method_name: 22 self.IPV6_PREFIXES = ["2001:db8::1/64"] 23 super().setup_method(method) 24 self.setup_netlink(NlConst.NETLINK_ROUTE) 25 26 def test_46_nofilter(self): 27 """Tests that listing outputs both IPv4/IPv6 and interfaces""" 28 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) 29 msg.nl_hdr.nlmsg_flags = ( 30 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 31 ) 32 self.write_message(msg) 33 34 ret = [] 35 for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): 36 ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index) 37 family = rx_msg.base_hdr.ifa_family 38 ret.append((ifname, family, rx_msg)) 39 40 ifname = "lo0" 41 assert len([r for r in ret if r[0] == ifname]) > 0 42 43 ifname = self.vnet.iface_alias_map["if1"].name 44 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1 45 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2 46 47 def test_46_filter_iface(self): 48 """Tests that listing outputs both IPv4/IPv6 for the specific interface""" 49 epair_ifname = self.vnet.iface_alias_map["if1"].name 50 51 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) 52 msg.nl_hdr.nlmsg_flags = ( 53 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 54 ) 55 msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname) 56 self.write_message(msg) 57 58 ret = [] 59 for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): 60 ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index) 61 family = rx_msg.base_hdr.ifa_family 62 ret.append((ifname, family, rx_msg)) 63 64 ifname = epair_ifname 65 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1 66 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2 67 assert len(ret) == 3 68 69 def test_46_filter_family_compat(self): 70 """Tests that family filtering works with the stripped header""" 71 72 hdr = Nlmsghdr( 73 nlmsg_len=17, 74 nlmsg_type=NlRtMsgType.RTM_GETADDR.value, 75 nlmsg_flags=NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value, 76 nlmsg_seq=self.helper.get_seq() 77 ) 78 data = bytes(hdr) + struct.pack("@B", socket.AF_INET) 79 self.nlsock.write_data(data) 80 81 ret = [] 82 for rx_msg in self.read_msg_list(hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): 83 ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index) 84 family = rx_msg.base_hdr.ifa_family 85 ret.append((ifname, family, rx_msg)) 86 assert len(ret) == 2 87 88 def filter_iface_family(self, family, num_items): 89 """Tests that listing outputs IPv4 for the specific interface""" 90 epair_ifname = self.vnet.iface_alias_map["if1"].name 91 92 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) 93 msg.nl_hdr.nlmsg_flags = ( 94 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 95 ) 96 msg.base_hdr.ifa_family = family 97 msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname) 98 self.write_message(msg) 99 100 ret = [] 101 for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): 102 assert family == rx_msg.base_hdr.ifa_family 103 assert epair_ifname == socket.if_indextoname(rx_msg.base_hdr.ifa_index) 104 ret.append(rx_msg) 105 assert len(ret) == num_items 106 return ret 107 108 def test_4_broadcast(self): 109 """Tests header/attr output for listing IPv4 ifas on broadcast iface""" 110 ret = self.filter_iface_family(socket.AF_INET, 1) 111 # Should be 192.0.2.1/24 112 msg = ret[0] 113 # Family and ifindex has been checked already 114 assert msg.base_hdr.ifa_prefixlen == 24 115 # Ignore IFA_FLAGS for now 116 assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 117 118 assert msg.get_nla(IfattrType.IFA_ADDRESS).addr == "192.0.2.1" 119 assert msg.get_nla(IfattrType.IFA_LOCAL).addr == "192.0.2.1" 120 assert msg.get_nla(IfattrType.IFA_BROADCAST).addr == "192.0.2.255" 121 122 epair_ifname = self.vnet.iface_alias_map["if1"].name 123 assert msg.get_nla(IfattrType.IFA_LABEL).text == epair_ifname 124 125 def test_6_broadcast(self): 126 """Tests header/attr output for listing IPv6 ifas on broadcast iface""" 127 ret = self.filter_iface_family(socket.AF_INET6, 2) 128 # Should be 192.0.2.1/24 129 if ret[0].base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value: 130 (gmsg, lmsg) = ret 131 else: 132 (lmsg, gmsg) = ret 133 # Start with global ( 2001:db8::1/64 ) 134 msg = gmsg 135 # Family and ifindex has been checked already 136 assert msg.base_hdr.ifa_prefixlen == 64 137 # Ignore IFA_FLAGS for now 138 assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 139 140 assert msg.get_nla(IfattrType.IFA_ADDRESS).addr == "2001:db8::1" 141 assert msg.get_nla(IfattrType.IFA_LOCAL) is None 142 assert msg.get_nla(IfattrType.IFA_BROADCAST) is None 143 144 epair_ifname = self.vnet.iface_alias_map["if1"].name 145 assert msg.get_nla(IfattrType.IFA_LABEL).text == epair_ifname 146 147 # Local: fe80::/64 148 msg = lmsg 149 assert msg.base_hdr.ifa_prefixlen == 64 150 # Ignore IFA_FLAGS for now 151 assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_LINK.value 152 153 addr = ipaddress.ip_address(msg.get_nla(IfattrType.IFA_ADDRESS).addr) 154 assert addr.is_link_local 155 # Verify that ifindex is not emmbedded 156 assert struct.unpack("!H", addr.packed[2:4])[0] == 0 157 assert msg.get_nla(IfattrType.IFA_LOCAL) is None 158 assert msg.get_nla(IfattrType.IFA_BROADCAST) is None 159 160 epair_ifname = self.vnet.iface_alias_map["if1"].name 161 assert msg.get_nla(IfattrType.IFA_LABEL).text == epair_ifname 162