1c57dfd92SAlexander V. Chernikovimport socket 2c57dfd92SAlexander V. Chernikov 3*fee65b7eSAlexander V. Chernikovimport pytest 4c57dfd92SAlexander V. Chernikovfrom atf_python.sys.net.vnet import SingleVnetTestTemplate 5*fee65b7eSAlexander V. Chernikovfrom atf_python.sys.netlink.netlink import NetlinkTestTemplate 6*fee65b7eSAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_route import NdAttrType 7*fee65b7eSAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_route import NetlinkNdMessage 8*fee65b7eSAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_route import NlRtMsgType 9*fee65b7eSAlexander V. Chernikovfrom atf_python.sys.netlink.utils import NlConst 10c57dfd92SAlexander V. Chernikov 11c57dfd92SAlexander V. Chernikov 12c57dfd92SAlexander V. Chernikovclass TestRtNlNeigh(NetlinkTestTemplate, SingleVnetTestTemplate): 13c57dfd92SAlexander V. Chernikov def setup_method(self, method): 14c57dfd92SAlexander V. Chernikov method_name = method.__name__ 15c57dfd92SAlexander V. Chernikov if "4" in method_name: 16c57dfd92SAlexander V. Chernikov self.IPV4_PREFIXES = ["192.0.2.1/24"] 17c57dfd92SAlexander V. Chernikov if "6" in method_name: 18c57dfd92SAlexander V. Chernikov self.IPV6_PREFIXES = ["2001:db8::1/64"] 19c57dfd92SAlexander V. Chernikov super().setup_method(method) 20c57dfd92SAlexander V. Chernikov self.setup_netlink(NlConst.NETLINK_ROUTE) 21c57dfd92SAlexander V. Chernikov 22c57dfd92SAlexander V. Chernikov def filter_iface(self, family, num_items): 23c57dfd92SAlexander V. Chernikov epair_ifname = self.vnet.iface_alias_map["if1"].name 24c57dfd92SAlexander V. Chernikov epair_ifindex = socket.if_nametoindex(epair_ifname) 25c57dfd92SAlexander V. Chernikov 26c57dfd92SAlexander V. Chernikov msg = NetlinkNdMessage(self.helper, NlRtMsgType.RTM_GETNEIGH) 27c57dfd92SAlexander V. Chernikov msg.set_request() 28c57dfd92SAlexander V. Chernikov msg.base_hdr.ndm_family = family 29c57dfd92SAlexander V. Chernikov msg.base_hdr.ndm_ifindex = epair_ifindex 30c57dfd92SAlexander V. Chernikov self.write_message(msg) 31c57dfd92SAlexander V. Chernikov 32c57dfd92SAlexander V. Chernikov ret = [] 33c57dfd92SAlexander V. Chernikov for rx_msg in self.read_msg_list( 34c57dfd92SAlexander V. Chernikov msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWNEIGH 35c57dfd92SAlexander V. Chernikov ): 36c57dfd92SAlexander V. Chernikov ifname = socket.if_indextoname(rx_msg.base_hdr.ndm_ifindex) 37c57dfd92SAlexander V. Chernikov family = rx_msg.base_hdr.ndm_family 38c57dfd92SAlexander V. Chernikov assert ifname == epair_ifname 39c57dfd92SAlexander V. Chernikov assert family == family 40c57dfd92SAlexander V. Chernikov assert rx_msg.get_nla(NdAttrType.NDA_DST) is not None 41c57dfd92SAlexander V. Chernikov assert rx_msg.get_nla(NdAttrType.NDA_LLADDR) is not None 42c57dfd92SAlexander V. Chernikov ret.append(rx_msg) 43c57dfd92SAlexander V. Chernikov assert len(ret) == num_items 44c57dfd92SAlexander V. Chernikov 45c57dfd92SAlexander V. Chernikov @pytest.mark.timeout(5) 46c57dfd92SAlexander V. Chernikov def test_6_filter_iface(self): 47c57dfd92SAlexander V. Chernikov """Tests that listing outputs all nd6 records""" 48c57dfd92SAlexander V. Chernikov return self.filter_iface(socket.AF_INET6, 2) 49c57dfd92SAlexander V. Chernikov 50c57dfd92SAlexander V. Chernikov @pytest.mark.timeout(5) 51c57dfd92SAlexander V. Chernikov def test_4_filter_iface(self): 52c57dfd92SAlexander V. Chernikov """Tests that listing outputs all arp records""" 53c57dfd92SAlexander V. Chernikov return self.filter_iface(socket.AF_INET, 1) 54