xref: /freebsd/tests/sys/netlink/test_rtnl_ifaddr.py (revision 5d8b48487acc8375675f2b7c4507c98ac5d0bf75)
1import ipaddress
2import socket
3import struct
4
5import pytest
6from atf_python.sys.net.vnet import SingleVnetTestTemplate
7from atf_python.sys.netlink.attrs import NlAttr
8from atf_python.sys.netlink.attrs import NlAttrIp
9from atf_python.sys.netlink.attrs import NlAttrNested
10from atf_python.sys.netlink.attrs import NlAttrU32
11from atf_python.sys.netlink.base_headers import NlmBaseFlags
12from atf_python.sys.netlink.base_headers import NlmNewFlags
13from atf_python.sys.netlink.base_headers import Nlmsghdr
14from atf_python.sys.netlink.message import NlMsgType
15from atf_python.sys.netlink.netlink import NetlinkTestTemplate
16from atf_python.sys.netlink.netlink import Nlsock
17from atf_python.sys.netlink.netlink_generic import CarpAttrType
18from atf_python.sys.netlink.netlink_generic import CarpGenMessage
19from atf_python.sys.netlink.netlink_generic import CarpMsgType
20from atf_python.sys.netlink.netlink_route import IfaAttrType
21from atf_python.sys.netlink.netlink_route import IfaCacheInfo
22from atf_python.sys.netlink.netlink_route import IfafAttrType
23from atf_python.sys.netlink.netlink_route import IfafFlags6
24from atf_python.sys.netlink.netlink_route import IfaFlags
25from atf_python.sys.netlink.netlink_route import NetlinkIfaMessage
26from atf_python.sys.netlink.netlink_route import NlRtMsgType
27from atf_python.sys.netlink.netlink_route import RtScope
28from atf_python.sys.netlink.utils import enum_or_int
29from atf_python.sys.netlink.utils import NlConst
30
31
32class TestRtNlIfaddrList(NetlinkTestTemplate, SingleVnetTestTemplate):
33    def setup_method(self, method):
34        method_name = method.__name__
35        if "4" in method_name:
36            if "nofilter" in method_name:
37                self.IPV4_PREFIXES = ["192.0.2.1/24", "169.254.169.254/16"]
38            else:
39                self.IPV4_PREFIXES = ["192.0.2.1/24"]
40        if "6" in method_name:
41            self.IPV6_PREFIXES = ["2001:db8::1/64"]
42        super().setup_method(method)
43        self.setup_netlink(NlConst.NETLINK_ROUTE)
44
45    def test_46_nofilter(self):
46        """Tests that listing outputs both IPv4/IPv6 and interfaces"""
47        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
48        msg.set_request()
49        self.write_message(msg)
50
51        ret = []
52        for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
53            ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
54            family = rx_msg.base_hdr.ifa_family
55            scope = rx_msg.base_hdr.ifa_scope
56            ret.append((ifname, family, scope))
57
58        ifname = "lo0"
59        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET and r[2] == RtScope.RT_SCOPE_HOST.value]) == 1
60        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_HOST.value]) == 1
61        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_LINK.value]) == 1
62        assert len([r for r in ret if r[0] == ifname]) == 3
63
64        ifname = self.vnet.iface_alias_map["if1"].name
65        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET and r[2] == RtScope.RT_SCOPE_LINK.value]) == 1
66        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET and r[2] == RtScope.RT_SCOPE_UNIVERSE.value]) == 1
67        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_LINK.value]) == 1
68        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_UNIVERSE.value]) == 1
69        assert len([r for r in ret if r[0] == ifname]) == 4
70
71    def test_46_filter_iface(self):
72        """Tests that listing outputs both IPv4/IPv6 for the specific interface"""
73        epair_ifname = self.vnet.iface_alias_map["if1"].name
74
75        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
76        msg.set_request()
77        msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname)
78        self.write_message(msg)
79
80        ret = []
81        for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
82            ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
83            family = rx_msg.base_hdr.ifa_family
84            ret.append((ifname, family, rx_msg))
85
86        ifname = epair_ifname
87        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1
88        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2
89        assert len(ret) == 3
90
91    def test_46_filter_family_compat(self):
92        """Tests that family filtering works with the stripped header"""
93
94        hdr = Nlmsghdr(
95            nlmsg_len=17,
96            nlmsg_type=NlRtMsgType.RTM_GETADDR.value,
97            nlmsg_flags=NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value,
98            nlmsg_seq=self.helper.get_seq(),
99        )
100        data = bytes(hdr) + struct.pack("@B", socket.AF_INET)
101        self.nlsock.write_data(data)
102
103        ret = []
104        for rx_msg in self.read_msg_list(hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
105            ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
106            family = rx_msg.base_hdr.ifa_family
107            ret.append((ifname, family, rx_msg))
108        assert len(ret) == 2
109
110    def filter_iface_family(self, family, num_items):
111        """Tests that listing outputs IPv4 for the specific interface"""
112        epair_ifname = self.vnet.iface_alias_map["if1"].name
113
114        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
115        msg.set_request()
116        msg.base_hdr.ifa_family = family
117        msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname)
118        self.write_message(msg)
119
120        ret = []
121        for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
122            assert family == rx_msg.base_hdr.ifa_family
123            assert epair_ifname == socket.if_indextoname(rx_msg.base_hdr.ifa_index)
124            ret.append(rx_msg)
125        assert len(ret) == num_items
126        return ret
127
128    def test_4_broadcast(self):
129        """Tests header/attr output for listing IPv4 ifas on broadcast iface"""
130        ret = self.filter_iface_family(socket.AF_INET, 1)
131        # Should be 192.0.2.1/24
132        msg = ret[0]
133        # Family and ifindex has been checked already
134        assert msg.base_hdr.ifa_prefixlen == 24
135        # Ignore IFA_FLAGS for now
136        assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
137
138        assert msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == "192.0.2.1"
139        assert msg.get_nla(IfaAttrType.IFA_LOCAL).addr == "192.0.2.1"
140        assert msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == "192.0.2.255"
141
142        epair_ifname = self.vnet.iface_alias_map["if1"].name
143        assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname
144
145    def test_6_broadcast(self):
146        """Tests header/attr output for listing IPv6 ifas on broadcast iface"""
147        ret = self.filter_iface_family(socket.AF_INET6, 2)
148        # Should be 192.0.2.1/24
149        if ret[0].base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value:
150            (gmsg, lmsg) = ret
151        else:
152            (lmsg, gmsg) = ret
153        # Start with global ( 2001:db8::1/64 )
154        msg = gmsg
155        # Family and ifindex has been checked already
156        assert msg.base_hdr.ifa_prefixlen == 64
157        # Ignore IFA_FLAGS for now
158        assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
159
160        assert msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == "2001:db8::1"
161        assert msg.get_nla(IfaAttrType.IFA_LOCAL) is None
162        assert msg.get_nla(IfaAttrType.IFA_BROADCAST) is None
163
164        epair_ifname = self.vnet.iface_alias_map["if1"].name
165        assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname
166
167        # Local: fe80::/64
168        msg = lmsg
169        assert msg.base_hdr.ifa_prefixlen == 64
170        # Ignore IFA_FLAGS for now
171        assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_LINK.value
172
173        addr = ipaddress.ip_address(msg.get_nla(IfaAttrType.IFA_ADDRESS).addr)
174        assert addr.is_link_local
175        # Verify that ifindex is not emmbedded
176        assert struct.unpack("!H", addr.packed[2:4])[0] == 0
177        assert msg.get_nla(IfaAttrType.IFA_LOCAL) is None
178        assert msg.get_nla(IfaAttrType.IFA_BROADCAST) is None
179
180        epair_ifname = self.vnet.iface_alias_map["if1"].name
181        assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname
182
183
184class RtnlIfaOps(NetlinkTestTemplate, SingleVnetTestTemplate):
185    def setup_method(self, method):
186        super().setup_method(method)
187        self.setup_netlink(NlConst.NETLINK_ROUTE)
188
189    def send_check_success(self, msg):
190        rx_msg = self.get_reply(msg)
191        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
192        assert rx_msg.error_code == 0
193
194    @staticmethod
195    def get_family_from_ip(ip):
196        if ip.version == 4:
197            return socket.AF_INET
198        return socket.AF_INET6
199
200    def create_msg(self, ifa):
201        iface = self.vnet.iface_alias_map["if1"]
202
203        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_NEWADDR.value)
204        msg.set_request()
205        msg.nl_hdr.nlmsg_flags |= (
206            NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
207        )
208        msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)
209        msg.base_hdr.ifa_index = iface.ifindex
210        msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen
211        return msg
212
213    def get_ifa_list(self, ifindex=0, family=0):
214        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
215        msg.set_request()
216        msg.base_hdr.ifa_family = family
217        msg.base_hdr.ifa_index = ifindex
218        self.write_message(msg)
219        return self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR)
220
221    def find_msg_by_ifa(self, msg_list, ip):
222        for msg in msg_list:
223            if msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ip):
224                return msg
225        return None
226
227    def setup_dummy_carp(self, ifindex: int, vhid: int):
228        self.require_module("carp")
229
230        nlsock = Nlsock(NlConst.NETLINK_GENERIC, self.helper)
231        family_id = nlsock.get_genl_family_id("carp")
232
233        msg = CarpGenMessage(self.helper, family_id, CarpMsgType.CARP_NL_CMD_SET)
234        msg.set_request()
235        msg.add_nla(NlAttrU32(CarpAttrType.CARP_NL_VHID, vhid))
236        msg.add_nla(NlAttrU32(CarpAttrType.CARP_NL_IFINDEX, ifindex))
237        rx_msg = nlsock.get_reply(msg)
238
239        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
240        assert rx_msg.error_code == 0
241
242
243class TestRtNlIfaddrOpsBroadcast(RtnlIfaOps):
244    def test_add_4(self):
245        """Tests IPv4 address addition to the standard broadcast interface"""
246        ifa = ipaddress.ip_interface("192.0.2.1/24")
247        ifa_brd = ifa.network.broadcast_address
248        iface = self.vnet.iface_alias_map["if1"]
249
250        msg = self.create_msg(ifa)
251        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
252        msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
253
254        self.send_check_success(msg)
255
256        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
257        assert len(lst) == 1
258        rx_msg = lst[0]
259
260        assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
261        assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
262
263        assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
264        assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
265        assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd)
266
267    @pytest.mark.parametrize(
268        "brd",
269        [
270            pytest.param((32, True, "192.0.2.1"), id="auto_32"),
271            pytest.param((31, True, "255.255.255.255"), id="auto_31"),
272            pytest.param((30, True, "192.0.2.3"), id="auto_30"),
273            pytest.param((30, False, "192.0.2.2"), id="custom_30"),
274            pytest.param((24, False, "192.0.2.7"), id="custom_24"),
275        ],
276    )
277    def test_add_4_brd(self, brd):
278        """Tests proper broadcast setup when adding IPv4 ifa"""
279        plen, auto_brd, ifa_brd_str = brd
280        ifa = ipaddress.ip_interface("192.0.2.1/{}".format(plen))
281        iface = self.vnet.iface_alias_map["if1"]
282        ifa_brd = ipaddress.ip_address(ifa_brd_str)
283
284        msg = self.create_msg(ifa)
285        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
286        if not auto_brd:
287            msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
288
289        self.send_check_success(msg)
290
291        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
292        assert len(lst) == 1
293        rx_msg = lst[0]
294
295        assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
296        assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
297
298        assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
299        assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
300        assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd)
301
302    def test_add_6(self):
303        ifa = ipaddress.ip_interface("2001:db8::1/64")
304        iface = self.vnet.iface_alias_map["if1"]
305
306        msg = self.create_msg(ifa)
307        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
308
309        self.send_check_success(msg)
310
311        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
312        assert len(lst) == 2
313        rx_msg_gu = self.find_msg_by_ifa(lst, ifa.ip)
314        assert rx_msg_gu is not None
315
316        assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen
317        assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
318        assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
319
320    def test_add_4_carp(self):
321        ifa = ipaddress.ip_interface("192.0.2.1/24")
322        ifa_brd = ifa.network.broadcast_address
323        iface = self.vnet.iface_alias_map["if1"]
324        vhid = 77
325
326        self.setup_dummy_carp(iface.ifindex, vhid)
327
328        msg = self.create_msg(ifa)
329        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
330        msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
331        attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_VHID, vhid)]
332        msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd))
333
334        self.send_check_success(msg)
335
336        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
337        assert len(lst) == 1
338        rx_msg = lst[0]
339
340        assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
341        assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
342
343        assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
344        assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
345        assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd)
346        ifa_bsd = rx_msg.get_nla(IfaAttrType.IFA_FREEBSD)
347        assert ifa_bsd.get_nla(IfafAttrType.IFAF_VHID).u32 == vhid
348
349    def test_add_6_carp(self):
350        ifa = ipaddress.ip_interface("2001:db8::1/64")
351        iface = self.vnet.iface_alias_map["if1"]
352        vhid = 77
353
354        self.setup_dummy_carp(iface.ifindex, vhid)
355
356        msg = self.create_msg(ifa)
357        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
358        attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_VHID, vhid)]
359        msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd))
360
361        self.send_check_success(msg)
362
363        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
364        assert len(lst) == 2
365        rx_msg_gu = self.find_msg_by_ifa(lst, ifa.ip)
366        assert rx_msg_gu is not None
367
368        assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen
369        assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
370        assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
371        ifa_bsd = rx_msg_gu.get_nla(IfaAttrType.IFA_FREEBSD)
372        assert ifa_bsd.get_nla(IfafAttrType.IFAF_VHID).u32 == vhid
373
374    def test_add_6_lifetime(self):
375        ifa = ipaddress.ip_interface("2001:db8::1/64")
376        iface = self.vnet.iface_alias_map["if1"]
377        pref_time = 43200
378        valid_time = 86400
379
380        ci = IfaCacheInfo(ifa_prefered=pref_time, ifa_valid=valid_time)
381
382        msg = self.create_msg(ifa)
383        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
384        msg.add_nla(NlAttr(IfaAttrType.IFA_CACHEINFO, bytes(ci)))
385
386        self.send_check_success(msg)
387
388        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
389        assert len(lst) == 2
390        rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
391        assert rx_msg is not None
392
393        ci = rx_msg.get_nla(IfaAttrType.IFA_CACHEINFO).ci
394        assert pref_time - 5 <= ci.ifa_prefered <= pref_time
395        assert valid_time - 5 <= ci.ifa_valid <= valid_time
396        assert ci.cstamp > 0
397        assert ci.tstamp > 0
398        assert ci.tstamp >= ci.cstamp
399
400    @pytest.mark.parametrize(
401        "flags_str",
402        [
403            "autoconf",
404            "deprecated",
405            "autoconf,deprecated",
406            "prefer_source",
407        ],
408    )
409    def test_add_6_flags(self, flags_str):
410        ifa = ipaddress.ip_interface("2001:db8::1/64")
411        iface = self.vnet.iface_alias_map["if1"]
412
413        flags_map = {
414            "autoconf": {"nl": 0, "f": IfafFlags6.IN6_IFF_AUTOCONF},
415            "deprecated": {
416                "nl": IfaFlags.IFA_F_DEPRECATED,
417                "f": IfafFlags6.IN6_IFF_DEPRECATED,
418            },
419            "prefer_source": {"nl": 0, "f": IfafFlags6.IN6_IFF_PREFER_SOURCE},
420        }
421        nl_flags = 0
422        f_flags = 0
423
424        for flag_str in flags_str.split(","):
425            d = flags_map.get(flag_str, {})
426            nl_flags |= enum_or_int(d.get("nl", 0))
427            f_flags |= enum_or_int(d.get("f", 0))
428
429        msg = self.create_msg(ifa)
430        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
431        msg.add_nla(NlAttrU32(IfaAttrType.IFA_FLAGS, nl_flags))
432        attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_FLAGS, f_flags)]
433        msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd))
434
435        self.send_check_success(msg)
436
437        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
438        assert len(lst) == 2
439        rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
440        assert rx_msg is not None
441
442        assert rx_msg.get_nla(IfaAttrType.IFA_FLAGS).u32 == nl_flags
443        ifa_bsd = rx_msg.get_nla(IfaAttrType.IFA_FREEBSD)
444        assert ifa_bsd.get_nla(IfafAttrType.IFAF_FLAGS).u32 == f_flags
445
446    def test_add_4_empty_message(self):
447        """Tests correct failure w/ empty message"""
448        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_NEWADDR.value)
449        msg.set_request()
450        msg.nl_hdr.nlmsg_flags |= (
451            NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
452        )
453
454        rx_msg = self.get_reply(msg)
455        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
456        assert rx_msg.error_code != 0
457
458    def test_add_4_empty_ifindex(self):
459        """Tests correct failure w/ empty ifindex"""
460        ifa = ipaddress.ip_interface("192.0.2.1/24")
461        ifa_brd = ifa.network.broadcast_address
462
463        msg = self.create_msg(ifa)
464        msg.base_hdr.ifa_index = 0
465        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
466        msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
467
468        rx_msg = self.get_reply(msg)
469        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
470        assert rx_msg.error_code != 0
471
472    def test_add_4_empty_addr(self):
473        """Tests correct failure w/ empty address"""
474        ifa = ipaddress.ip_interface("192.0.2.1/24")
475        ifa_brd = ifa.network.broadcast_address
476
477        msg = self.create_msg(ifa)
478        msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
479
480        rx_msg = self.get_reply(msg)
481        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
482        assert rx_msg.error_code != 0
483
484    @pytest.mark.parametrize(
485        "ifa_str",
486        [
487            pytest.param("192.0.2.1/32", id="ipv4_host"),
488            pytest.param("192.0.2.1/24", id="ipv4_prefix"),
489            pytest.param("2001:db8::1/64", id="ipv6_gu_prefix"),
490            pytest.param("2001:db8::1/128", id="ipv6_gu_host"),
491        ],
492    )
493    @pytest.mark.parametrize(
494        "tlv",
495        [
496            pytest.param("local", id="ifa_local"),
497            pytest.param("address", id="ifa_address"),
498        ],
499    )
500    def test_del(self, tlv, ifa_str):
501        """Tests address deletion from the standard broadcast interface"""
502        ifa = ipaddress.ip_interface(ifa_str)
503        ifa_brd = ifa.network.broadcast_address
504        iface = self.vnet.iface_alias_map["if1"]
505
506        msg = self.create_msg(ifa)
507        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
508        msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
509
510        self.send_check_success(msg)
511        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
512        rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
513        assert rx_msg is not None
514
515        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value)
516        msg.set_request()
517        msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)
518        msg.base_hdr.ifa_index = iface.ifindex
519        msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen
520
521        if tlv == "local":
522            msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
523        if tlv == "address":
524            msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip)))
525
526        self.send_check_success(msg)
527        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
528        rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
529        assert rx_msg is None
530
531
532class TestRtNlIfaddrOpsP2p(RtnlIfaOps):
533    IFTYPE = "gif"
534
535    @pytest.mark.parametrize(
536        "ifa_pair",
537        [
538            pytest.param(["192.0.2.1/24", "192.0.2.2"], id="dst_inside_24"),
539            pytest.param(["192.0.2.1/30", "192.0.2.2"], id="dst_inside_30"),
540            pytest.param(["192.0.2.1/31", "192.0.2.2"], id="dst_inside_31"),
541            pytest.param(["192.0.2.1/32", "192.0.2.2"], id="dst_outside_32"),
542            pytest.param(["192.0.2.1/30", "192.0.2.100"], id="dst_outside_30"),
543        ],
544    )
545    def test_add_4(self, ifa_pair):
546        """Tests IPv4 address addition to the p2p interface"""
547        ifa = ipaddress.ip_interface(ifa_pair[0])
548        peer_ip = ipaddress.ip_address(ifa_pair[1])
549        iface = self.vnet.iface_alias_map["if1"]
550
551        msg = self.create_msg(ifa)
552        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
553        msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))
554
555        self.send_check_success(msg)
556
557        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
558        assert len(lst) == 1
559        rx_msg = lst[0]
560
561        assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
562        assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
563
564        assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
565        assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(peer_ip)
566
567    @pytest.mark.parametrize(
568        "ifa_pair",
569        [
570            pytest.param(
571                ["2001:db8::1/64", "2001:db8::2"],
572                id="dst_inside_64",
573                marks=pytest.mark.xfail(reason="currently fails"),
574            ),
575            pytest.param(
576                ["2001:db8::1/127", "2001:db8::2"],
577                id="dst_inside_127",
578                marks=pytest.mark.xfail(reason="currently fails"),
579            ),
580            pytest.param(["2001:db8::1/128", "2001:db8::2"], id="dst_outside_128"),
581            pytest.param(
582                ["2001:db8::1/64", "2001:db8:2::2"],
583                id="dst_outside_64",
584                marks=pytest.mark.xfail(reason="currently fails"),
585            ),
586        ],
587    )
588    def test_add_6(self, ifa_pair):
589        """Tests IPv6 address addition to the p2p interface"""
590        ifa = ipaddress.ip_interface(ifa_pair[0])
591        peer_ip = ipaddress.ip_address(ifa_pair[1])
592        iface = self.vnet.iface_alias_map["if1"]
593
594        msg = self.create_msg(ifa)
595        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
596        msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))
597
598        self.send_check_success(msg)
599
600        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
601        assert len(lst) == 2
602        rx_msg_gu = self.find_msg_by_ifa(lst, peer_ip)
603        assert rx_msg_gu is not None
604
605        assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen
606        assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
607        assert rx_msg_gu.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
608        assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(peer_ip)
609
610    @pytest.mark.parametrize(
611        "ifa_pair",
612        [
613            pytest.param(["192.0.2.1/30", "192.0.2.2"], id="ipv4_dst_inside_30"),
614            pytest.param(["192.0.2.1/32", "192.0.2.2"], id="ipv4_dst_outside_32"),
615            pytest.param(["2001:db8::1/128", "2001:db8::2"], id="ip6_dst_outside_128"),
616        ],
617    )
618    @pytest.mark.parametrize(
619        "tlv_pair",
620        [
621            pytest.param(["a", ""], id="ifa_addr=addr"),
622            pytest.param(["", "a"], id="ifa_local=addr"),
623            pytest.param(["a", "a"], id="ifa_addr=addr,ifa_local=addr"),
624        ],
625    )
626    def test_del(self, tlv_pair, ifa_pair):
627        """Tests address deletion from the P2P interface"""
628        ifa = ipaddress.ip_interface(ifa_pair[0])
629        peer_ip = ipaddress.ip_address(ifa_pair[1])
630        iface = self.vnet.iface_alias_map["if1"]
631        ifa_addr_str, ifa_local_str = tlv_pair
632
633        msg = self.create_msg(ifa)
634        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
635        msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))
636
637        self.send_check_success(msg)
638        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
639        rx_msg = self.find_msg_by_ifa(lst, peer_ip)
640        assert rx_msg is not None
641
642        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value)
643        msg.set_request()
644        msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)
645        msg.base_hdr.ifa_index = iface.ifindex
646        msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen
647
648        if "a" in ifa_addr_str:
649            msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip)))
650        if "p" in ifa_addr_str:
651            msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))
652        if "a" in ifa_local_str:
653            msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
654        if "p" in ifa_local_str:
655            msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(peer_ip)))
656
657        self.send_check_success(msg)
658        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
659        rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
660        assert rx_msg is None
661
662
663class TestRtNlAddIfaddrLo(RtnlIfaOps):
664    IFTYPE = "lo"
665
666    @pytest.mark.parametrize(
667        "ifa_str",
668        [
669            pytest.param("192.0.2.1/24", id="prefix"),
670            pytest.param("192.0.2.1/32", id="host"),
671        ],
672    )
673    def test_add_4(self, ifa_str):
674        """Tests IPv4 address addition to the loopback interface"""
675        ifa = ipaddress.ip_interface(ifa_str)
676        iface = self.vnet.iface_alias_map["if1"]
677
678        msg = self.create_msg(ifa)
679        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
680
681        self.send_check_success(msg)
682
683        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
684        assert len(lst) == 1
685        rx_msg = lst[0]
686
687        assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
688        assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
689
690        assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
691        assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
692
693    @pytest.mark.parametrize(
694        "ifa_str",
695        [
696            pytest.param("2001:db8::1/64", id="gu_prefix"),
697            pytest.param("2001:db8::1/128", id="gu_host"),
698        ],
699    )
700    def test_add_6(self, ifa_str):
701        """Tests IPv6 address addition to the loopback interface"""
702        ifa = ipaddress.ip_interface(ifa_str)
703        iface = self.vnet.iface_alias_map["if1"]
704
705        msg = self.create_msg(ifa)
706        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
707
708        self.send_check_success(msg)
709
710        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
711        assert len(lst) == 2  # link-local should be auto-created as well
712        rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
713        assert rx_msg is not None
714
715        assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
716        assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
717        assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
718
719    @pytest.mark.parametrize(
720        "ifa_str",
721        [
722            pytest.param("192.0.2.1/32", id="ipv4_host"),
723            pytest.param("192.0.2.1/24", id="ipv4_prefix"),
724            pytest.param("2001:db8::1/64", id="ipv6_gu_prefix"),
725            pytest.param("2001:db8::1/128", id="ipv6_gu_host"),
726        ],
727    )
728    @pytest.mark.parametrize(
729        "tlv",
730        [
731            pytest.param("local", id="ifa_local"),
732            pytest.param("address", id="ifa_address"),
733        ],
734    )
735    def test_del(self, tlv, ifa_str):
736        """Tests address deletion from the loopback interface"""
737        ifa = ipaddress.ip_interface(ifa_str)
738        iface = self.vnet.iface_alias_map["if1"]
739
740        msg = self.create_msg(ifa)
741        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
742
743        self.send_check_success(msg)
744        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
745        rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
746        assert rx_msg is not None
747
748        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value)
749        msg.set_request()
750        msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)
751        msg.base_hdr.ifa_index = iface.ifindex
752        msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen
753
754        if tlv == "local":
755            msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
756        if tlv == "address":
757            msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip)))
758
759        self.send_check_success(msg)
760        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
761        rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
762        assert rx_msg is None
763