1import ipaddress 2import socket 3import struct 4 5from atf_python.sys.net.netlink import IfattrType 6from atf_python.sys.net.netlink import NetlinkIfaMessage 7from atf_python.sys.net.netlink import NetlinkTestTemplate 8from atf_python.sys.net.netlink import NlConst 9from atf_python.sys.net.netlink import NlHelper 10from atf_python.sys.net.netlink import NlmBaseFlags 11from atf_python.sys.net.netlink import NlMsgType 12from atf_python.sys.net.netlink import NlRtMsgType 13from atf_python.sys.net.netlink import Nlsock 14from atf_python.sys.net.netlink import RtScope 15from atf_python.sys.net.vnet import SingleVnetTestTemplate 16 17 18class TestRtNlIfaddr(NetlinkTestTemplate, SingleVnetTestTemplate): 19 def setup_method(self, method): 20 method_name = method.__name__ 21 if "4" in method_name: 22 self.IPV4_PREFIXES = ["192.0.2.1/24"] 23 if "6" in method_name: 24 self.IPV6_PREFIXES = ["2001:db8::1/64"] 25 super().setup_method(method) 26 self.setup_netlink(NlConst.NETLINK_ROUTE) 27 28 def test_46_nofilter(self): 29 """Tests that listing outputs both IPv4/IPv6 and interfaces""" 30 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) 31 msg.nl_hdr.nlmsg_flags = ( 32 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 33 ) 34 self.write_message(msg) 35 36 ret = [] 37 for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): 38 ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index) 39 family = rx_msg.base_hdr.ifa_family 40 ret.append((ifname, family, rx_msg)) 41 42 ifname = "lo0" 43 assert len([r for r in ret if r[0] == ifname]) > 0 44 45 ifname = self.vnet.iface_alias_map["if1"].name 46 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1 47 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2 48 49 def test_46_filter_iface(self): 50 """Tests that listing outputs both IPv4/IPv6 for the specific interface""" 51 epair_ifname = self.vnet.iface_alias_map["if1"].name 52 53 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) 54 msg.nl_hdr.nlmsg_flags = ( 55 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 56 ) 57 msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname) 58 self.write_message(msg) 59 60 ret = [] 61 for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): 62 ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index) 63 family = rx_msg.base_hdr.ifa_family 64 ret.append((ifname, family, rx_msg)) 65 66 ifname = epair_ifname 67 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1 68 assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2 69 assert len(ret) == 3 70 71 def filter_iface_family(self, family, num_items): 72 """Tests that listing outputs IPv4 for the specific interface""" 73 epair_ifname = self.vnet.iface_alias_map["if1"].name 74 75 msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) 76 msg.nl_hdr.nlmsg_flags = ( 77 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value 78 ) 79 msg.base_hdr.ifa_family = family 80 msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname) 81 self.write_message(msg) 82 83 ret = [] 84 for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): 85 assert family == rx_msg.base_hdr.ifa_family 86 assert epair_ifname == socket.if_indextoname(rx_msg.base_hdr.ifa_index) 87 ret.append(rx_msg) 88 assert len(ret) == num_items 89 return ret 90 91 def test_4_broadcast(self): 92 """Tests header/attr output for listing IPv4 ifas on broadcast iface""" 93 ret = self.filter_iface_family(socket.AF_INET, 1) 94 # Should be 192.0.2.1/24 95 msg = ret[0] 96 # Family and ifindex has been checked already 97 assert msg.base_hdr.ifa_prefixlen == 24 98 # Ignore IFA_FLAGS for now 99 assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 100 101 assert msg.get_nla(IfattrType.IFA_ADDRESS).addr == "192.0.2.1" 102 assert msg.get_nla(IfattrType.IFA_LOCAL).addr == "192.0.2.1" 103 assert msg.get_nla(IfattrType.IFA_BROADCAST).addr == "192.0.2.255" 104 105 epair_ifname = self.vnet.iface_alias_map["if1"].name 106 assert msg.get_nla(IfattrType.IFA_LABEL).text == epair_ifname 107 108 def test_6_broadcast(self): 109 """Tests header/attr output for listing IPv6 ifas on broadcast iface""" 110 ret = self.filter_iface_family(socket.AF_INET6, 2) 111 # Should be 192.0.2.1/24 112 if ret[0].base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value: 113 (gmsg, lmsg) = ret 114 else: 115 (lmsg, gmsg) = ret 116 # Start with global ( 2001:db8::1/64 ) 117 msg = gmsg 118 # Family and ifindex has been checked already 119 assert msg.base_hdr.ifa_prefixlen == 64 120 # Ignore IFA_FLAGS for now 121 assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value 122 123 assert msg.get_nla(IfattrType.IFA_ADDRESS).addr == "2001:db8::1" 124 assert msg.get_nla(IfattrType.IFA_LOCAL) is None 125 assert msg.get_nla(IfattrType.IFA_BROADCAST) is None 126 127 epair_ifname = self.vnet.iface_alias_map["if1"].name 128 assert msg.get_nla(IfattrType.IFA_LABEL).text == epair_ifname 129 130 # Local: fe80::/64 131 msg = lmsg 132 assert msg.base_hdr.ifa_prefixlen == 64 133 # Ignore IFA_FLAGS for now 134 assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_LINK.value 135 136 addr = ipaddress.ip_address(msg.get_nla(IfattrType.IFA_ADDRESS).addr) 137 assert addr.is_link_local 138 # Verify that ifindex is not emmbedded 139 assert struct.unpack("!H", addr.packed[2:4])[0] == 0 140 assert msg.get_nla(IfattrType.IFA_LOCAL) is None 141 assert msg.get_nla(IfattrType.IFA_BROADCAST) is None 142 143 epair_ifname = self.vnet.iface_alias_map["if1"].name 144 assert msg.get_nla(IfattrType.IFA_LABEL).text == epair_ifname 145