1*c57dfd92SAlexander V. Chernikovimport socket 2*c57dfd92SAlexander V. Chernikovimport pytest 3*c57dfd92SAlexander V. Chernikov 4*c57dfd92SAlexander V. Chernikovfrom atf_python.sys.net.netlink import NdAttrType 5*c57dfd92SAlexander V. Chernikovfrom atf_python.sys.net.netlink import NetlinkNdMessage 6*c57dfd92SAlexander V. Chernikovfrom atf_python.sys.net.netlink import NetlinkTestTemplate 7*c57dfd92SAlexander V. Chernikovfrom atf_python.sys.net.netlink import NlConst 8*c57dfd92SAlexander V. Chernikovfrom atf_python.sys.net.netlink import NlRtMsgType 9*c57dfd92SAlexander V. Chernikovfrom atf_python.sys.net.vnet import SingleVnetTestTemplate 10*c57dfd92SAlexander V. Chernikov 11*c57dfd92SAlexander V. Chernikov 12*c57dfd92SAlexander V. Chernikovclass TestRtNlNeigh(NetlinkTestTemplate, SingleVnetTestTemplate): 13*c57dfd92SAlexander V. Chernikov def setup_method(self, method): 14*c57dfd92SAlexander V. Chernikov method_name = method.__name__ 15*c57dfd92SAlexander V. Chernikov if "4" in method_name: 16*c57dfd92SAlexander V. Chernikov self.IPV4_PREFIXES = ["192.0.2.1/24"] 17*c57dfd92SAlexander V. Chernikov if "6" in method_name: 18*c57dfd92SAlexander V. Chernikov self.IPV6_PREFIXES = ["2001:db8::1/64"] 19*c57dfd92SAlexander V. Chernikov super().setup_method(method) 20*c57dfd92SAlexander V. Chernikov self.setup_netlink(NlConst.NETLINK_ROUTE) 21*c57dfd92SAlexander V. Chernikov 22*c57dfd92SAlexander V. Chernikov def filter_iface(self, family, num_items): 23*c57dfd92SAlexander V. Chernikov epair_ifname = self.vnet.iface_alias_map["if1"].name 24*c57dfd92SAlexander V. Chernikov epair_ifindex = socket.if_nametoindex(epair_ifname) 25*c57dfd92SAlexander V. Chernikov 26*c57dfd92SAlexander V. Chernikov msg = NetlinkNdMessage(self.helper, NlRtMsgType.RTM_GETNEIGH) 27*c57dfd92SAlexander V. Chernikov msg.set_request() 28*c57dfd92SAlexander V. Chernikov msg.base_hdr.ndm_family = family 29*c57dfd92SAlexander V. Chernikov msg.base_hdr.ndm_ifindex = epair_ifindex 30*c57dfd92SAlexander V. Chernikov self.write_message(msg) 31*c57dfd92SAlexander V. Chernikov 32*c57dfd92SAlexander V. Chernikov ret = [] 33*c57dfd92SAlexander V. Chernikov for rx_msg in self.read_msg_list( 34*c57dfd92SAlexander V. Chernikov msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWNEIGH 35*c57dfd92SAlexander V. Chernikov ): 36*c57dfd92SAlexander V. Chernikov ifname = socket.if_indextoname(rx_msg.base_hdr.ndm_ifindex) 37*c57dfd92SAlexander V. Chernikov family = rx_msg.base_hdr.ndm_family 38*c57dfd92SAlexander V. Chernikov assert ifname == epair_ifname 39*c57dfd92SAlexander V. Chernikov assert family == family 40*c57dfd92SAlexander V. Chernikov assert rx_msg.get_nla(NdAttrType.NDA_DST) is not None 41*c57dfd92SAlexander V. Chernikov assert rx_msg.get_nla(NdAttrType.NDA_LLADDR) is not None 42*c57dfd92SAlexander V. Chernikov ret.append(rx_msg) 43*c57dfd92SAlexander V. Chernikov assert len(ret) == num_items 44*c57dfd92SAlexander V. Chernikov 45*c57dfd92SAlexander V. Chernikov @pytest.mark.timeout(5) 46*c57dfd92SAlexander V. Chernikov def test_6_filter_iface(self): 47*c57dfd92SAlexander V. Chernikov """Tests that listing outputs all nd6 records""" 48*c57dfd92SAlexander V. Chernikov return self.filter_iface(socket.AF_INET6, 2) 49*c57dfd92SAlexander V. Chernikov 50*c57dfd92SAlexander V. Chernikov @pytest.mark.timeout(5) 51*c57dfd92SAlexander V. Chernikov def test_4_filter_iface(self): 52*c57dfd92SAlexander V. Chernikov """Tests that listing outputs all arp records""" 53*c57dfd92SAlexander V. Chernikov return self.filter_iface(socket.AF_INET, 1) 54