1import socket 2 3import pytest 4from atf_python.sys.net.vnet import SingleVnetTestTemplate 5from atf_python.sys.netlink.netlink import NetlinkTestTemplate 6from atf_python.sys.netlink.netlink_route import NdAttrType 7from atf_python.sys.netlink.netlink_route import NetlinkNdMessage 8from atf_python.sys.netlink.netlink_route import NlRtMsgType 9from atf_python.sys.netlink.utils import NlConst 10 11 12class TestRtNlNeigh(NetlinkTestTemplate, SingleVnetTestTemplate): 13 def setup_method(self, method): 14 method_name = method.__name__ 15 if "4" in method_name: 16 self.IPV4_PREFIXES = ["192.0.2.1/24"] 17 if "6" in method_name: 18 self.IPV6_PREFIXES = ["2001:db8::1/64"] 19 super().setup_method(method) 20 self.setup_netlink(NlConst.NETLINK_ROUTE) 21 22 def filter_iface(self, family, num_items): 23 epair_ifname = self.vnet.iface_alias_map["if1"].name 24 epair_ifindex = socket.if_nametoindex(epair_ifname) 25 26 msg = NetlinkNdMessage(self.helper, NlRtMsgType.RTM_GETNEIGH) 27 msg.set_request() 28 msg.base_hdr.ndm_family = family 29 msg.base_hdr.ndm_ifindex = epair_ifindex 30 self.write_message(msg) 31 32 ret = [] 33 for rx_msg in self.read_msg_list( 34 msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWNEIGH 35 ): 36 ifname = socket.if_indextoname(rx_msg.base_hdr.ndm_ifindex) 37 family = rx_msg.base_hdr.ndm_family 38 assert ifname == epair_ifname 39 assert family == family 40 assert rx_msg.get_nla(NdAttrType.NDA_DST) is not None 41 assert rx_msg.get_nla(NdAttrType.NDA_LLADDR) is not None 42 ret.append(rx_msg) 43 assert len(ret) == num_items 44 45 @pytest.mark.timeout(5) 46 def test_6_filter_iface(self): 47 """Tests that listing outputs all nd6 records""" 48 return self.filter_iface(socket.AF_INET6, 2) 49 50 @pytest.mark.timeout(5) 51 def test_4_filter_iface(self): 52 """Tests that listing outputs all arp records""" 53 return self.filter_iface(socket.AF_INET, 1) 54