1import ipaddress 2import socket 3import struct 4 5from atf_python.sys.net.netlink import IfattrType 6from atf_python.sys.net.netlink import NetlinkIfaMessage 7from atf_python.sys.net.netlink import NetlinkTestTemplate 8from atf_python.sys.net.netlink import NlConst 9from atf_python.sys.net.netlink import NlHelper 10from atf_python.sys.net.netlink import NlmBaseFlags 11from atf_python.sys.net.netlink import Nlmsghdr 12from atf_python.sys.net.netlink import NlMsgType 13from atf_python.sys.net.netlink import NlRtMsgType 14from atf_python.sys.net.netlink import Nlsock 15from atf_python.sys.net.netlink import RtScope 16from atf_python.sys.net.vnet import SingleVnetTestTemplate 17 18 19class TestRtNlIfaddr(NetlinkTestTemplate, SingleVnetTestTemplate): 20 def setup_method(self, method): 21 method_name = method.__name__ 22 if "4" in method_name: 23 self.IPV4_PREFIXES = ["192.0.2.1/24"] 24 if "6" in method_name: 25 self.IPV6_PREFIXES = ["2001:db8::1/64"] 26 super().setup_method(method) 27 self.setup_netlink(NlConst.NETLINK_ROUTE) 28 29 def test_46_nofilter(self): 30 """Tests that listing outputs both IPv4/IPv6 and interfaces""" 31 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) 32 msg.nl_hdr.nlmsg_flags = ( 33 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 34 ) 35 self.write_message(msg) 36 37 ret = [] 38 for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): 39 ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index) 40 family = rx_msg.base_hdr.ifa_family 41 ret.append((ifname, family, rx_msg)) 42 43 ifname = "lo0" 44 assert len([r for r in ret if r[0] == ifname]) > 0 45 46 ifname = self.vnet.iface_alias_map["if1"].name 47 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1 48 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2 49 50 def test_46_filter_iface(self): 51 """Tests that listing outputs both IPv4/IPv6 for the specific interface""" 52 epair_ifname = self.vnet.iface_alias_map["if1"].name 53 54 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) 55 msg.nl_hdr.nlmsg_flags = ( 56 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 57 ) 58 msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname) 59 self.write_message(msg) 60 61 ret = [] 62 for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): 63 ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index) 64 family = rx_msg.base_hdr.ifa_family 65 ret.append((ifname, family, rx_msg)) 66 67 ifname = epair_ifname 68 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1 69 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2 70 assert len(ret) == 3 71 72 def test_46_filter_family_compat(self): 73 """Tests that family filtering works with the stripped header""" 74 75 hdr = Nlmsghdr( 76 nlmsg_len=17, 77 nlmsg_type=NlRtMsgType.RTM_GETADDR.value, 78 nlmsg_flags=NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value, 79 nlmsg_seq=self.helper.get_seq() 80 ) 81 data = bytes(hdr) + struct.pack("@B", socket.AF_INET) 82 self.nlsock.write_data(data) 83 84 ret = [] 85 for rx_msg in self.read_msg_list(hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): 86 ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index) 87 family = rx_msg.base_hdr.ifa_family 88 ret.append((ifname, family, rx_msg)) 89 assert len(ret) == 2 90 91 def filter_iface_family(self, family, num_items): 92 """Tests that listing outputs IPv4 for the specific interface""" 93 epair_ifname = self.vnet.iface_alias_map["if1"].name 94 95 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) 96 msg.nl_hdr.nlmsg_flags = ( 97 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 98 ) 99 msg.base_hdr.ifa_family = family 100 msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname) 101 self.write_message(msg) 102 103 ret = [] 104 for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): 105 assert family == rx_msg.base_hdr.ifa_family 106 assert epair_ifname == socket.if_indextoname(rx_msg.base_hdr.ifa_index) 107 ret.append(rx_msg) 108 assert len(ret) == num_items 109 return ret 110 111 def test_4_broadcast(self): 112 """Tests header/attr output for listing IPv4 ifas on broadcast iface""" 113 ret = self.filter_iface_family(socket.AF_INET, 1) 114 # Should be 192.0.2.1/24 115 msg = ret[0] 116 # Family and ifindex has been checked already 117 assert msg.base_hdr.ifa_prefixlen == 24 118 # Ignore IFA_FLAGS for now 119 assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 120 121 assert msg.get_nla(IfattrType.IFA_ADDRESS).addr == "192.0.2.1" 122 assert msg.get_nla(IfattrType.IFA_LOCAL).addr == "192.0.2.1" 123 assert msg.get_nla(IfattrType.IFA_BROADCAST).addr == "192.0.2.255" 124 125 epair_ifname = self.vnet.iface_alias_map["if1"].name 126 assert msg.get_nla(IfattrType.IFA_LABEL).text == epair_ifname 127 128 def test_6_broadcast(self): 129 """Tests header/attr output for listing IPv6 ifas on broadcast iface""" 130 ret = self.filter_iface_family(socket.AF_INET6, 2) 131 # Should be 192.0.2.1/24 132 if ret[0].base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value: 133 (gmsg, lmsg) = ret 134 else: 135 (lmsg, gmsg) = ret 136 # Start with global ( 2001:db8::1/64 ) 137 msg = gmsg 138 # Family and ifindex has been checked already 139 assert msg.base_hdr.ifa_prefixlen == 64 140 # Ignore IFA_FLAGS for now 141 assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 142 143 assert msg.get_nla(IfattrType.IFA_ADDRESS).addr == "2001:db8::1" 144 assert msg.get_nla(IfattrType.IFA_LOCAL) is None 145 assert msg.get_nla(IfattrType.IFA_BROADCAST) is None 146 147 epair_ifname = self.vnet.iface_alias_map["if1"].name 148 assert msg.get_nla(IfattrType.IFA_LABEL).text == epair_ifname 149 150 # Local: fe80::/64 151 msg = lmsg 152 assert msg.base_hdr.ifa_prefixlen == 64 153 # Ignore IFA_FLAGS for now 154 assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_LINK.value 155 156 addr = ipaddress.ip_address(msg.get_nla(IfattrType.IFA_ADDRESS).addr) 157 assert addr.is_link_local 158 # Verify that ifindex is not emmbedded 159 assert struct.unpack("!H", addr.packed[2:4])[0] == 0 160 assert msg.get_nla(IfattrType.IFA_LOCAL) is None 161 assert msg.get_nla(IfattrType.IFA_BROADCAST) is None 162 163 epair_ifname = self.vnet.iface_alias_map["if1"].name 164 assert msg.get_nla(IfattrType.IFA_LABEL).text == epair_ifname 165