xref: /freebsd/tests/sys/netlink/test_rtnl_ifaddr.py (revision c07d6445eb89d9dd3950361b065b7bd110e3a043)
1import ipaddress
2import socket
3import struct
4
5from atf_python.sys.net.netlink import IfattrType
6from atf_python.sys.net.netlink import NetlinkIfaMessage
7from atf_python.sys.net.netlink import NetlinkTestTemplate
8from atf_python.sys.net.netlink import NlConst
9from atf_python.sys.net.netlink import NlHelper
10from atf_python.sys.net.netlink import NlmBaseFlags
11from atf_python.sys.net.netlink import NlMsgType
12from atf_python.sys.net.netlink import NlRtMsgType
13from atf_python.sys.net.netlink import Nlsock
14from atf_python.sys.net.netlink import RtScope
15from atf_python.sys.net.vnet import SingleVnetTestTemplate
16
17
18class TestRtNlIfaddr(NetlinkTestTemplate, SingleVnetTestTemplate):
19    def setup_method(self, method):
20        method_name = method.__name__
21        if "4" in method_name:
22            self.IPV4_PREFIXES = ["192.0.2.1/24"]
23        if "6" in method_name:
24            self.IPV6_PREFIXES = ["2001:db8::1/64"]
25        super().setup_method(method)
26        self.setup_netlink(NlConst.NETLINK_ROUTE)
27
28    def test_46_nofilter(self):
29        """Tests that listing outputs both IPv4/IPv6 and interfaces"""
30        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
31        msg.nl_hdr.nlmsg_flags = (
32            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
33        )
34        self.write_message(msg)
35
36        ret = []
37        for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
38            ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
39            family = rx_msg.base_hdr.ifa_family
40            ret.append((ifname, family, rx_msg))
41
42        ifname = "lo0"
43        assert len([r for r in ret if r[0] == ifname]) > 0
44
45        ifname = self.vnet.iface_alias_map["if1"].name
46        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1
47        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2
48
49    def test_46_filter_iface(self):
50        """Tests that listing outputs both IPv4/IPv6 for the specific interface"""
51        epair_ifname = self.vnet.iface_alias_map["if1"].name
52
53        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
54        msg.nl_hdr.nlmsg_flags = (
55            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
56        )
57        msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname)
58        self.write_message(msg)
59
60        ret = []
61        for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
62            ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
63            family = rx_msg.base_hdr.ifa_family
64            ret.append((ifname, family, rx_msg))
65
66        ifname = epair_ifname
67        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1
68        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2
69        assert len(ret) == 3
70
71    def filter_iface_family(self, family, num_items):
72        """Tests that listing outputs IPv4 for the specific interface"""
73        epair_ifname = self.vnet.iface_alias_map["if1"].name
74
75        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
76        msg.nl_hdr.nlmsg_flags = (
77            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
78        )
79        msg.base_hdr.ifa_family = family
80        msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname)
81        self.write_message(msg)
82
83        ret = []
84        for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
85            assert family == rx_msg.base_hdr.ifa_family
86            assert epair_ifname == socket.if_indextoname(rx_msg.base_hdr.ifa_index)
87            ret.append(rx_msg)
88        assert len(ret) == num_items
89        return ret
90
91    def test_4_broadcast(self):
92        """Tests header/attr output for listing IPv4 ifas on broadcast iface"""
93        ret = self.filter_iface_family(socket.AF_INET, 1)
94        # Should be 192.0.2.1/24
95        msg = ret[0]
96        # Family and ifindex has been checked already
97        assert msg.base_hdr.ifa_prefixlen == 24
98        # Ignore IFA_FLAGS for now
99        assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
100
101        assert msg.get_nla(IfattrType.IFA_ADDRESS).addr == "192.0.2.1"
102        assert msg.get_nla(IfattrType.IFA_LOCAL).addr == "192.0.2.1"
103        assert msg.get_nla(IfattrType.IFA_BROADCAST).addr == "192.0.2.255"
104
105        epair_ifname = self.vnet.iface_alias_map["if1"].name
106        assert msg.get_nla(IfattrType.IFA_LABEL).text == epair_ifname
107
108    def test_6_broadcast(self):
109        """Tests header/attr output for listing IPv6 ifas on broadcast iface"""
110        ret = self.filter_iface_family(socket.AF_INET6, 2)
111        # Should be 192.0.2.1/24
112        if ret[0].base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value:
113            (gmsg, lmsg) = ret
114        else:
115            (lmsg, gmsg) = ret
116        # Start with global ( 2001:db8::1/64 )
117        msg = gmsg
118        # Family and ifindex has been checked already
119        assert msg.base_hdr.ifa_prefixlen == 64
120        # Ignore IFA_FLAGS for now
121        assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
122
123        assert msg.get_nla(IfattrType.IFA_ADDRESS).addr == "2001:db8::1"
124        assert msg.get_nla(IfattrType.IFA_LOCAL) is None
125        assert msg.get_nla(IfattrType.IFA_BROADCAST) is None
126
127        epair_ifname = self.vnet.iface_alias_map["if1"].name
128        assert msg.get_nla(IfattrType.IFA_LABEL).text == epair_ifname
129
130        # Local: fe80::/64
131        msg = lmsg
132        assert msg.base_hdr.ifa_prefixlen == 64
133        # Ignore IFA_FLAGS for now
134        assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_LINK.value
135
136        addr = ipaddress.ip_address(msg.get_nla(IfattrType.IFA_ADDRESS).addr)
137        assert addr.is_link_local
138        # Verify that ifindex is not emmbedded
139        assert struct.unpack("!H", addr.packed[2:4])[0] == 0
140        assert msg.get_nla(IfattrType.IFA_LOCAL) is None
141        assert msg.get_nla(IfattrType.IFA_BROADCAST) is None
142
143        epair_ifname = self.vnet.iface_alias_map["if1"].name
144        assert msg.get_nla(IfattrType.IFA_LABEL).text == epair_ifname
145