1import ipaddress 2import socket 3 4import pytest 5from atf_python.sys.net.tools import ToolsHelper 6from atf_python.sys.net.vnet import IfaceFactory 7from atf_python.sys.net.vnet import SingleVnetTestTemplate 8from atf_python.sys.netlink.attrs import NlAttrIp 9from atf_python.sys.netlink.attrs import NlAttrU32 10from atf_python.sys.netlink.base_headers import NlmBaseFlags 11from atf_python.sys.netlink.base_headers import NlmGetFlags 12from atf_python.sys.netlink.base_headers import NlmNewFlags 13from atf_python.sys.netlink.base_headers import NlMsgType 14from atf_python.sys.netlink.netlink import NetlinkTestTemplate 15from atf_python.sys.netlink.netlink_route import NetlinkRtMessage 16from atf_python.sys.netlink.netlink_route import NlRtMsgType 17from atf_python.sys.netlink.netlink_route import RtattrType 18from atf_python.sys.netlink.utils import NlConst 19 20 21class TestRtNlRoute(NetlinkTestTemplate, SingleVnetTestTemplate): 22 IPV6_PREFIXES = ["2001:db8::1/64"] 23 24 def setup_method(self, method): 25 super().setup_method(method) 26 self.setup_netlink(NlConst.NETLINK_ROUTE) 27 28 @pytest.mark.timeout(5) 29 def test_add_route6_ll_gw(self): 30 epair_ifname = self.vnet.iface_alias_map["if1"].name 31 epair_ifindex = socket.if_nametoindex(epair_ifname) 32 33 msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE) 34 msg.set_request() 35 msg.add_nlflags([NlmNewFlags.NLM_F_CREATE]) 36 msg.base_hdr.rtm_family = socket.AF_INET6 37 msg.base_hdr.rtm_dst_len = 64 38 msg.add_nla(NlAttrIp(RtattrType.RTA_DST, "2001:db8:2::")) 39 msg.add_nla(NlAttrIp(RtattrType.RTA_GATEWAY, "fe80::1")) 40 msg.add_nla(NlAttrU32(RtattrType.RTA_OIF, epair_ifindex)) 41 42 rx_msg = self.get_reply(msg) 43 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 44 assert rx_msg.error_code == 0 45 46 ToolsHelper.print_net_debug() 47 ToolsHelper.print_output("netstat -6onW") 48 49 @pytest.mark.timeout(5) 50 def test_add_route6_ll_if_gw(self): 51 tun_ifname = IfaceFactory().create_iface("", "tun")[0].name 52 tun_ifindex = socket.if_nametoindex(tun_ifname) 53 54 msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE) 55 msg.set_request() 56 msg.add_nlflags([NlmNewFlags.NLM_F_CREATE]) 57 msg.base_hdr.rtm_family = socket.AF_INET6 58 msg.base_hdr.rtm_dst_len = 64 59 msg.add_nla(NlAttrIp(RtattrType.RTA_DST, "2001:db8:2::")) 60 msg.add_nla(NlAttrU32(RtattrType.RTA_OIF, tun_ifindex)) 61 62 rx_msg = self.get_reply(msg) 63 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 64 assert rx_msg.error_code == 0 65 66 ToolsHelper.print_net_debug() 67 ToolsHelper.print_output("netstat -6onW") 68 69 @pytest.mark.timeout(5) 70 def test_add_route4_ll_if_gw(self): 71 tun_ifname = IfaceFactory().create_iface("", "tun")[0].name 72 tun_ifindex = socket.if_nametoindex(tun_ifname) 73 74 msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE) 75 msg.set_request() 76 msg.add_nlflags([NlmNewFlags.NLM_F_CREATE]) 77 msg.base_hdr.rtm_family = socket.AF_INET 78 msg.base_hdr.rtm_dst_len = 32 79 msg.add_nla(NlAttrIp(RtattrType.RTA_DST, "192.0.2.1")) 80 msg.add_nla(NlAttrU32(RtattrType.RTA_OIF, tun_ifindex)) 81 82 rx_msg = self.get_reply(msg) 83 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 84 assert rx_msg.error_code == 0 85 86 ToolsHelper.print_net_debug() 87 ToolsHelper.print_output("netstat -4onW") 88 89 @pytest.mark.timeout(20) 90 def test_buffer_override(self): 91 msg_flags = ( 92 NlmBaseFlags.NLM_F_ACK.value 93 | NlmBaseFlags.NLM_F_REQUEST.value 94 | NlmNewFlags.NLM_F_CREATE.value 95 ) 96 97 num_routes = 1000 98 base_address = bytearray(ipaddress.ip_address("2001:db8:ffff::").packed) 99 for i in range(num_routes): 100 base_address[7] = i % 256 101 base_address[6] = i // 256 102 prefix_address = ipaddress.IPv6Address(bytes(base_address)) 103 104 msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE.value) 105 msg.nl_hdr.nlmsg_flags = msg_flags 106 msg.base_hdr.rtm_family = socket.AF_INET6 107 msg.base_hdr.rtm_dst_len = 65 108 msg.add_nla(NlAttrIp(RtattrType.RTA_DST, str(prefix_address))) 109 msg.add_nla(NlAttrIp(RtattrType.RTA_GATEWAY, "2001:db8::2")) 110 111 self.write_message(msg, silent=True) 112 rx_msg = self.read_message(silent=True) 113 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 114 assert msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq 115 assert rx_msg.error_code == 0 116 # Now, dump 117 msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_GETROUTE.value) 118 msg.nl_hdr.nlmsg_flags = ( 119 NlmBaseFlags.NLM_F_ACK.value 120 | NlmBaseFlags.NLM_F_REQUEST.value 121 | NlmGetFlags.NLM_F_ROOT.value 122 | NlmGetFlags.NLM_F_MATCH.value 123 ) 124 msg.base_hdr.rtm_family = socket.AF_INET6 125 self.write_message(msg) 126 num_received = 0 127 while True: 128 rx_msg = self.read_message(silent=True) 129 if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq: 130 if rx_msg.is_type(NlMsgType.NLMSG_ERROR): 131 if rx_msg.error_code != 0: 132 raise ValueError( 133 "unable to dump routes: error {}".format(rx_msg.error_code) 134 ) 135 if rx_msg.is_type(NlMsgType.NLMSG_DONE): 136 break 137 if rx_msg.is_type(NlRtMsgType.RTM_NEWROUTE): 138 if rx_msg.base_hdr.rtm_dst_len == 65: 139 num_received += 1 140 assert num_routes == num_received 141