1import ipaddress 2import socket 3 4import pytest 5from atf_python.sys.net.netlink import NetlinkRtMessage 6from atf_python.sys.net.netlink import NetlinkTestTemplate 7from atf_python.sys.net.netlink import NlAttrIp 8from atf_python.sys.net.netlink import NlConst 9from atf_python.sys.net.netlink import NlmBaseFlags 10from atf_python.sys.net.netlink import NlmGetFlags 11from atf_python.sys.net.netlink import NlmNewFlags 12from atf_python.sys.net.netlink import NlMsgType 13from atf_python.sys.net.netlink import NlRtMsgType 14from atf_python.sys.net.netlink import RtattrType 15from atf_python.sys.net.vnet import SingleVnetTestTemplate 16 17 18class TestRtNlRoute(NetlinkTestTemplate, SingleVnetTestTemplate): 19 IPV6_PREFIXES = ["2001:db8::1/64"] 20 21 def setup_method(self, method): 22 super().setup_method(method) 23 self.setup_netlink(NlConst.NETLINK_ROUTE) 24 25 @pytest.mark.timeout(20) 26 def test_buffer_override(self): 27 msg_flags = ( 28 NlmBaseFlags.NLM_F_ACK.value 29 | NlmBaseFlags.NLM_F_REQUEST.value 30 | NlmNewFlags.NLM_F_CREATE.value 31 ) 32 33 num_routes = 1000 34 base_address = bytearray(ipaddress.ip_address("2001:db8:ffff::").packed) 35 for i in range(num_routes): 36 base_address[7] = i % 256 37 base_address[6] = i // 256 38 prefix_address = ipaddress.IPv6Address(bytes(base_address)) 39 40 msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE.value) 41 msg.nl_hdr.nlmsg_flags = msg_flags 42 msg.base_hdr.rtm_family = socket.AF_INET6 43 msg.base_hdr.rtm_dst_len = 65 44 msg.add_nla(NlAttrIp(RtattrType.RTA_DST, str(prefix_address))) 45 msg.add_nla(NlAttrIp(RtattrType.RTA_GATEWAY, "2001:db8::2")) 46 47 self.write_message(msg, silent=True) 48 rx_msg = self.read_message(silent=True) 49 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 50 assert msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq 51 assert rx_msg.error_code == 0 52 # Now, dump 53 msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_GETROUTE.value) 54 msg.nl_hdr.nlmsg_flags = ( 55 NlmBaseFlags.NLM_F_ACK.value 56 | NlmBaseFlags.NLM_F_REQUEST.value 57 | NlmGetFlags.NLM_F_ROOT.value 58 | NlmGetFlags.NLM_F_MATCH.value 59 ) 60 msg.base_hdr.rtm_family = socket.AF_INET6 61 self.write_message(msg) 62 num_received = 0 63 while True: 64 rx_msg = self.read_message(silent=True) 65 if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq: 66 if rx_msg.is_type(NlMsgType.NLMSG_ERROR): 67 if rx_msg.error_code != 0: 68 raise ValueError( 69 "unable to dump routes: error {}".format(rx_msg.error_code) 70 ) 71 if rx_msg.is_type(NlMsgType.NLMSG_DONE): 72 break 73 if rx_msg.is_type(NlRtMsgType.RTM_NEWROUTE): 74 if rx_msg.base_hdr.rtm_dst_len == 65: 75 num_received += 1 76 assert num_routes == num_received 77