xref: /freebsd/tests/sys/netlink/test_rtnl_route.py (revision 4530e0c3e78d0616367d37273d6c1f47f627839b)
1import ipaddress
2import socket
3
4import pytest
5from atf_python.sys.net.netlink import NetlinkRtMessage
6from atf_python.sys.net.netlink import NetlinkTestTemplate
7from atf_python.sys.net.netlink import NlAttrIp
8from atf_python.sys.net.netlink import NlConst
9from atf_python.sys.net.netlink import NlmBaseFlags
10from atf_python.sys.net.netlink import NlmGetFlags
11from atf_python.sys.net.netlink import NlmNewFlags
12from atf_python.sys.net.netlink import NlMsgType
13from atf_python.sys.net.netlink import NlRtMsgType
14from atf_python.sys.net.netlink import RtattrType
15from atf_python.sys.net.vnet import SingleVnetTestTemplate
16
17
18class TestRtNlRoute(NetlinkTestTemplate, SingleVnetTestTemplate):
19    IPV6_PREFIXES = ["2001:db8::1/64"]
20
21    def setup_method(self, method):
22        super().setup_method(method)
23        self.setup_netlink(NlConst.NETLINK_ROUTE)
24
25    @pytest.mark.timeout(20)
26    def test_buffer_override(self):
27        msg_flags = (
28            NlmBaseFlags.NLM_F_ACK.value
29            | NlmBaseFlags.NLM_F_REQUEST.value
30            | NlmNewFlags.NLM_F_CREATE.value
31        )
32
33        num_routes = 1000
34        base_address = bytearray(ipaddress.ip_address("2001:db8:ffff::").packed)
35        for i in range(num_routes):
36            base_address[7] = i % 256
37            base_address[6] = i // 256
38            prefix_address = ipaddress.IPv6Address(bytes(base_address))
39
40            msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE.value)
41            msg.nl_hdr.nlmsg_flags = msg_flags
42            msg.base_hdr.rtm_family = socket.AF_INET6
43            msg.base_hdr.rtm_dst_len = 65
44            msg.add_nla(NlAttrIp(RtattrType.RTA_DST, str(prefix_address)))
45            msg.add_nla(NlAttrIp(RtattrType.RTA_GATEWAY, "2001:db8::2"))
46
47            self.write_message(msg, silent=True)
48            rx_msg = self.read_message(silent=True)
49            assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
50            assert msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq
51            assert rx_msg.error_code == 0
52        # Now, dump
53        msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_GETROUTE.value)
54        msg.nl_hdr.nlmsg_flags = (
55            NlmBaseFlags.NLM_F_ACK.value
56            | NlmBaseFlags.NLM_F_REQUEST.value
57            | NlmGetFlags.NLM_F_ROOT.value
58            | NlmGetFlags.NLM_F_MATCH.value
59        )
60        msg.base_hdr.rtm_family = socket.AF_INET6
61        self.write_message(msg)
62        num_received = 0
63        while True:
64            rx_msg = self.read_message(silent=True)
65            if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq:
66                if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
67                    if rx_msg.error_code != 0:
68                        raise ValueError(
69                            "unable to dump routes: error {}".format(rx_msg.error_code)
70                        )
71                if rx_msg.is_type(NlMsgType.NLMSG_DONE):
72                    break
73                if rx_msg.is_type(NlRtMsgType.RTM_NEWROUTE):
74                    if rx_msg.base_hdr.rtm_dst_len == 65:
75                        num_received += 1
76        assert num_routes == num_received
77