1import ipaddress 2import socket 3 4import pytest 5from atf_python.sys.net.tools import ToolsHelper 6from atf_python.sys.net.vnet import IfaceFactory 7from atf_python.sys.net.vnet import SingleVnetTestTemplate 8from atf_python.sys.netlink.attrs import NlAttrIp 9from atf_python.sys.netlink.attrs import NlAttrU32 10from atf_python.sys.netlink.base_headers import NlmBaseFlags 11from atf_python.sys.netlink.base_headers import NlmGetFlags 12from atf_python.sys.netlink.base_headers import NlmNewFlags 13from atf_python.sys.netlink.base_headers import NlMsgType 14from atf_python.sys.netlink.netlink import NetlinkTestTemplate 15from atf_python.sys.netlink.netlink_route import NetlinkRtMessage 16from atf_python.sys.netlink.netlink_route import NlRtMsgType 17from atf_python.sys.netlink.netlink_route import RtattrType 18from atf_python.sys.netlink.utils import NlConst 19 20 21class TestRtNlRoute(NetlinkTestTemplate, SingleVnetTestTemplate): 22 IPV6_PREFIXES = ["2001:db8::1/64"] 23 24 def setup_method(self, method): 25 super().setup_method(method) 26 self.setup_netlink(NlConst.NETLINK_ROUTE) 27 28 @pytest.mark.timeout(5) 29 def test_add_route6_ll_gw(self): 30 epair_ifname = self.vnet.iface_alias_map["if1"].name 31 epair_ifindex = socket.if_nametoindex(epair_ifname) 32 33 msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE) 34 msg.set_request() 35 msg.add_nlflags([NlmNewFlags.NLM_F_CREATE]) 36 msg.base_hdr.rtm_family = socket.AF_INET6 37 msg.base_hdr.rtm_dst_len = 64 38 msg.add_nla(NlAttrIp(RtattrType.RTA_DST, "2001:db8:2::")) 39 msg.add_nla(NlAttrIp(RtattrType.RTA_GATEWAY, "fe80::1")) 40 msg.add_nla(NlAttrU32(RtattrType.RTA_OIF, epair_ifindex)) 41 42 rx_msg = self.get_reply(msg) 43 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 44 assert rx_msg.error_code == 0 45 46 ToolsHelper.print_net_debug() 47 ToolsHelper.print_output("netstat -6onW") 48 49 @pytest.mark.timeout(5) 50 def test_add_route6_ll_if_gw(self): 51 self.require_module("if_tun") 52 tun_ifname = IfaceFactory().create_iface("", "tun")[0].name 53 tun_ifindex = socket.if_nametoindex(tun_ifname) 54 55 msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE) 56 msg.set_request() 57 msg.add_nlflags([NlmNewFlags.NLM_F_CREATE]) 58 msg.base_hdr.rtm_family = socket.AF_INET6 59 msg.base_hdr.rtm_dst_len = 64 60 msg.add_nla(NlAttrIp(RtattrType.RTA_DST, "2001:db8:2::")) 61 msg.add_nla(NlAttrU32(RtattrType.RTA_OIF, tun_ifindex)) 62 63 rx_msg = self.get_reply(msg) 64 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 65 assert rx_msg.error_code == 0 66 67 ToolsHelper.print_net_debug() 68 ToolsHelper.print_output("netstat -6onW") 69 70 @pytest.mark.timeout(5) 71 def test_add_route4_ll_if_gw(self): 72 self.require_module("if_tun") 73 tun_ifname = IfaceFactory().create_iface("", "tun")[0].name 74 tun_ifindex = socket.if_nametoindex(tun_ifname) 75 76 msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE) 77 msg.set_request() 78 msg.add_nlflags([NlmNewFlags.NLM_F_CREATE]) 79 msg.base_hdr.rtm_family = socket.AF_INET 80 msg.base_hdr.rtm_dst_len = 32 81 msg.add_nla(NlAttrIp(RtattrType.RTA_DST, "192.0.2.1")) 82 msg.add_nla(NlAttrU32(RtattrType.RTA_OIF, tun_ifindex)) 83 84 rx_msg = self.get_reply(msg) 85 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 86 assert rx_msg.error_code == 0 87 88 ToolsHelper.print_net_debug() 89 ToolsHelper.print_output("netstat -4onW") 90 91 @pytest.mark.timeout(20) 92 def test_buffer_override(self): 93 msg_flags = ( 94 NlmBaseFlags.NLM_F_ACK.value 95 | NlmBaseFlags.NLM_F_REQUEST.value 96 | NlmNewFlags.NLM_F_CREATE.value 97 ) 98 99 num_routes = 1000 100 base_address = bytearray(ipaddress.ip_address("2001:db8:ffff::").packed) 101 for i in range(num_routes): 102 base_address[7] = i % 256 103 base_address[6] = i // 256 104 prefix_address = ipaddress.IPv6Address(bytes(base_address)) 105 106 msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE.value) 107 msg.nl_hdr.nlmsg_flags = msg_flags 108 msg.base_hdr.rtm_family = socket.AF_INET6 109 msg.base_hdr.rtm_dst_len = 65 110 msg.add_nla(NlAttrIp(RtattrType.RTA_DST, str(prefix_address))) 111 msg.add_nla(NlAttrIp(RtattrType.RTA_GATEWAY, "2001:db8::2")) 112 113 self.write_message(msg, silent=True) 114 rx_msg = self.read_message(silent=True) 115 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) 116 assert msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq 117 assert rx_msg.error_code == 0 118 # Now, dump 119 msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_GETROUTE.value) 120 msg.nl_hdr.nlmsg_flags = ( 121 NlmBaseFlags.NLM_F_ACK.value 122 | NlmBaseFlags.NLM_F_REQUEST.value 123 | NlmGetFlags.NLM_F_ROOT.value 124 | NlmGetFlags.NLM_F_MATCH.value 125 ) 126 msg.base_hdr.rtm_family = socket.AF_INET6 127 self.write_message(msg) 128 num_received = 0 129 while True: 130 rx_msg = self.read_message(silent=True) 131 if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq: 132 if rx_msg.is_type(NlMsgType.NLMSG_ERROR): 133 if rx_msg.error_code != 0: 134 raise ValueError( 135 "unable to dump routes: error {}".format(rx_msg.error_code) 136 ) 137 if rx_msg.is_type(NlMsgType.NLMSG_DONE): 138 break 139 if rx_msg.is_type(NlRtMsgType.RTM_NEWROUTE): 140 if rx_msg.base_hdr.rtm_dst_len == 65: 141 num_received += 1 142 assert num_routes == num_received 143