xref: /freebsd/tests/sys/netinet6/test_ip6_output.py (revision 1539a657adb4363e7f12d6fd4c1c8ed2e2d842af)
1import errno
2import ipaddress
3import socket
4import struct
5import time
6from ctypes import c_byte
7from ctypes import c_uint
8from ctypes import Structure
9
10import pytest
11from atf_python.sys.net.rtsock import SaHelper
12from atf_python.sys.net.tools import ToolsHelper
13from atf_python.sys.net.vnet import SingleVnetTestTemplate
14from atf_python.sys.net.vnet import VnetTestTemplate
15
16
17class In6Pktinfo(Structure):
18    _fields_ = [
19        ("ipi6_addr", c_byte * 16),
20        ("ipi6_ifindex", c_uint),
21    ]
22
23
24class VerboseSocketServer:
25    def __init__(self, ip: str, port: int, ifname: str = None):
26        self.ip = ip
27        self.port = port
28
29        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
30        s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1)
31        addr = ipaddress.ip_address(ip)
32        if addr.is_link_local and ifname:
33            ifindex = socket.if_nametoindex(ifname)
34            addr_tuple = (ip, port, 0, ifindex)
35        elif addr.is_multicast and ifname:
36            ifindex = socket.if_nametoindex(ifname)
37            mreq = socket.inet_pton(socket.AF_INET6, ip) + struct.pack("I", ifindex)
38            s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
39            print("## JOINED group {} % {}".format(ip, ifname))
40            addr_tuple = ("::", port, 0, ifindex)
41        else:
42            addr_tuple = (ip, port, 0, 0)
43        print("## Listening on [{}]:{}".format(addr_tuple[0], port))
44        s.bind(addr_tuple)
45        self.socket = s
46
47    def recv(self):
48        # data = self.socket.recv(4096)
49        # print("RX: " + data)
50        data, ancdata, msg_flags, address = self.socket.recvmsg(4096, 128)
51        # Assume ancdata has just 1 item
52        info = In6Pktinfo.from_buffer_copy(ancdata[0][2])
53        dst_ip = socket.inet_ntop(socket.AF_INET6, info.ipi6_addr)
54        dst_iface = socket.if_indextoname(info.ipi6_ifindex)
55
56        tx_obj = {
57            "data": data,
58            "src_ip": address[0],
59            "dst_ip": dst_ip,
60            "dst_iface": dst_iface,
61        }
62        return tx_obj
63
64
65class BaseTestIP6Ouput(VnetTestTemplate):
66    TOPOLOGY = {
67        "vnet1": {"ifaces": ["if1", "if2", "if3"]},
68        "vnet2": {"ifaces": ["if1", "if2", "if3"]},
69        "if1": {"prefixes6": [("2001:db8:a::1/64", "2001:db8:a::2/64")]},
70        "if2": {"prefixes6": [("2001:db8:b::1/64", "2001:db8:b::2/64")]},
71        "if3": {"prefixes6": [("2001:db8:c::1/64", "2001:db8:c::2/64")]},
72    }
73    DEFAULT_PORT = 45365
74
75    def _vnet2_handler(self, vnet, ip: str, os_ifname: str = None):
76        """Generic listener that sends first received packet with metadata
77        back to the sender via pipw
78        """
79        ll_data = ToolsHelper.get_linklocals()
80        # Start listener
81        ss = VerboseSocketServer(ip, self.DEFAULT_PORT, os_ifname)
82        vnet.pipe.send(ll_data)
83
84        tx_obj = ss.recv()
85        tx_obj["dst_iface_alias"] = vnet.iface_map[tx_obj["dst_iface"]].alias
86        vnet.pipe.send(tx_obj)
87
88
89class TestIP6Output(BaseTestIP6Ouput):
90    def vnet2_handler(self, vnet):
91        ip = str(vnet.iface_alias_map["if2"].first_ipv6.ip)
92        self._vnet2_handler(vnet, ip, None)
93
94    @pytest.mark.require_user("root")
95    def test_output6_base(self):
96        """Tests simple UDP output"""
97        second_vnet = self.vnet_map["vnet2"]
98
99        # Pick target on if2 vnet2's end
100        ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
101        ip = str(ifaddr.ip)
102
103        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
104        data = bytes("AAAA", "utf-8")
105        print("## TX packet to {},{}".format(ip, self.DEFAULT_PORT))
106
107        # Wait for the child to become ready
108        self.wait_object(second_vnet.pipe)
109        s.sendto(data, (ip, self.DEFAULT_PORT))
110
111        # Wait for the received object
112        rx_obj = self.wait_object(second_vnet.pipe)
113        assert rx_obj["dst_ip"] == ip
114        assert rx_obj["dst_iface_alias"] == "if2"
115
116    @pytest.mark.require_user("root")
117    def test_output6_nhop(self):
118        """Tests UDP output with custom nhop set"""
119        second_vnet = self.vnet_map["vnet2"]
120
121        # Pick target on if2 vnet2's end
122        ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
123        ip_dst = str(ifaddr.ip)
124        # Pick nexthop on if1
125        ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if1"]["prefixes6"][0][1])
126        ip_next = str(ifaddr.ip)
127        sin6_next = SaHelper.ip6_sa(ip_next, 0)
128
129        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0)
130        s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_NEXTHOP, sin6_next)
131
132        # Wait for the child to become ready
133        self.wait_object(second_vnet.pipe)
134        data = bytes("AAAA", "utf-8")
135        s.sendto(data, (ip_dst, self.DEFAULT_PORT))
136
137        # Wait for the received object
138        rx_obj = self.wait_object(second_vnet.pipe)
139        assert rx_obj["dst_ip"] == ip_dst
140        assert rx_obj["dst_iface_alias"] == "if1"
141
142    @pytest.mark.parametrize(
143        "params",
144        [
145            # esrc: src-ip, if: src-interface, esrc: expected-src,
146            # eif: expected-rx-interface
147            pytest.param({"esrc": "2001:db8:b::1", "eif": "if2"}, id="empty"),
148            pytest.param(
149                {"src": "2001:db8:c::1", "esrc": "2001:db8:c::1", "eif": "if2"},
150                id="iponly1",
151            ),
152            pytest.param(
153                {
154                    "src": "2001:db8:c::1",
155                    "if": "if3",
156                    "ex": errno.EHOSTUNREACH,
157                },
158                id="ipandif",
159            ),
160            pytest.param(
161                {
162                    "src": "2001:db8:c::aaaa",
163                    "ex": errno.EADDRNOTAVAIL,
164                },
165                id="nolocalip",
166            ),
167            pytest.param(
168                {"if": "if2", "src": "2001:db8:b::1", "eif": "if2"}, id="ifsame"
169            ),
170        ],
171    )
172    @pytest.mark.require_user("root")
173    def test_output6_pktinfo(self, params):
174        """Tests simple UDP output"""
175        second_vnet = self.vnet_map["vnet2"]
176        vnet = self.vnet
177
178        # Pick target on if2 vnet2's end
179        ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
180        dst_ip = str(ifaddr.ip)
181
182        src_ip = params.get("src", "")
183        src_ifname = params.get("if", "")
184        expected_ip = params.get("esrc", "")
185        expected_ifname = params.get("eif", "")
186        errno = params.get("ex", 0)
187
188        pktinfo = In6Pktinfo()
189        if src_ip:
190            for i, b in enumerate(socket.inet_pton(socket.AF_INET6, src_ip)):
191                pktinfo.ipi6_addr[i] = b
192        if src_ifname:
193            os_ifname = vnet.iface_alias_map[src_ifname].name
194            pktinfo.ipi6_ifindex = socket.if_nametoindex(os_ifname)
195
196        # Wait for the child to become ready
197        self.wait_object(second_vnet.pipe)
198        data = bytes("AAAA", "utf-8")
199
200        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0)
201        try:
202            s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, bytes(pktinfo))
203            aux = (socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, bytes(pktinfo))
204            s.sendto(data, (dst_ip, self.DEFAULT_PORT))
205        except OSError as e:
206            if not errno:
207                raise
208            assert e.errno == errno
209            print("Correctly raised {}".format(e))
210            return
211
212        # Wait for the received object
213        rx_obj = self.wait_object(second_vnet.pipe)
214
215        assert rx_obj["dst_ip"] == dst_ip
216        if expected_ip:
217            assert rx_obj["src_ip"] == expected_ip
218        if expected_ifname:
219            assert rx_obj["dst_iface_alias"] == expected_ifname
220
221
222class TestIP6OutputLL(BaseTestIP6Ouput):
223    def vnet2_handler(self, vnet):
224        """Generic listener that sends first received packet with metadata
225        back to the sender via pipw
226        """
227        os_ifname = vnet.iface_alias_map["if2"].name
228        ll_data = ToolsHelper.get_linklocals()
229        ll_ip, _ = ll_data[os_ifname][0]
230        self._vnet2_handler(vnet, ll_ip, os_ifname)
231
232    @pytest.mark.require_user("root")
233    def test_output6_linklocal(self):
234        """Tests simple UDP output"""
235        second_vnet = self.vnet_map["vnet2"]
236
237        # Wait for the child to become ready
238        ll_data = self.wait_object(second_vnet.pipe)
239
240        # Pick LL address on if2 vnet2's end
241        ip, _ = ll_data[second_vnet.iface_alias_map["if2"].name][0]
242        # Get local interface scope
243        os_ifname = self.vnet.iface_alias_map["if2"].name
244        scopeid = socket.if_nametoindex(os_ifname)
245
246        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
247        data = bytes("AAAA", "utf-8")
248        target = (ip, self.DEFAULT_PORT, 0, scopeid)
249        print("## TX packet to {}%{},{}".format(ip, scopeid, target[1]))
250
251        s.sendto(data, target)
252
253        # Wait for the received object
254        rx_obj = self.wait_object(second_vnet.pipe)
255        assert rx_obj["dst_ip"] == ip
256        assert rx_obj["dst_iface_alias"] == "if2"
257
258
259class TestIP6OutputNhopLL(BaseTestIP6Ouput):
260    def vnet2_handler(self, vnet):
261        """Generic listener that sends first received packet with metadata
262        back to the sender via pipw
263        """
264        ip = str(vnet.iface_alias_map["if2"].first_ipv6.ip)
265        self._vnet2_handler(vnet, ip, None)
266
267    @pytest.mark.require_user("root")
268    def test_output6_nhop_linklocal(self):
269        """Tests UDP output with custom link-local nhop set"""
270        second_vnet = self.vnet_map["vnet2"]
271
272        # Wait for the child to become ready
273        ll_data = self.wait_object(second_vnet.pipe)
274
275        # Pick target on if2 vnet2's end
276        ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
277        ip_dst = str(ifaddr.ip)
278        # Pick nexthop on if1
279        ip_next, _ = ll_data[second_vnet.iface_alias_map["if1"].name][0]
280        # Get local interfaces
281        os_ifname = self.vnet.iface_alias_map["if1"].name
282        scopeid = socket.if_nametoindex(os_ifname)
283        sin6_next = SaHelper.ip6_sa(ip_next, scopeid)
284
285        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0)
286        s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_NEXTHOP, sin6_next)
287
288        data = bytes("AAAA", "utf-8")
289        s.sendto(data, (ip_dst, self.DEFAULT_PORT))
290
291        # Wait for the received object
292        rx_obj = self.wait_object(second_vnet.pipe)
293        assert rx_obj["dst_ip"] == ip_dst
294        assert rx_obj["dst_iface_alias"] == "if1"
295
296
297class TestIP6OutputScope(BaseTestIP6Ouput):
298    def vnet2_handler(self, vnet):
299        """Generic listener that sends first received packet with metadata
300        back to the sender via pipw
301        """
302        bind_ip, bind_ifp = self.wait_object(vnet.pipe)
303        if bind_ip is None:
304            os_ifname = vnet.iface_alias_map[bind_ifp].name
305            ll_data = ToolsHelper.get_linklocals()
306            bind_ip, _ = ll_data[os_ifname][0]
307        if bind_ifp is not None:
308            bind_ifp = vnet.iface_alias_map[bind_ifp].name
309        print("## BIND {}%{}".format(bind_ip, bind_ifp))
310        self._vnet2_handler(vnet, bind_ip, bind_ifp)
311
312    @pytest.mark.parametrize(
313        "params",
314        [
315            # sif/dif: source/destination interface (for link-local addr)
316            # sip/dip: source/destination ip (for non-LL addr)
317            # ex: OSError errno that sendto() must raise
318            pytest.param({"sif": "if2", "dif": "if2"}, id="same"),
319            pytest.param(
320                {
321                    "sif": "if1",
322                    "dif": "if2",
323                    "ex": errno.EHOSTUNREACH,
324                },
325                id="ll_differentif1",
326            ),
327            pytest.param(
328                {
329                    "sif": "if1",
330                    "dip": "2001:db8:b::2",
331                    "ex": errno.EHOSTUNREACH,
332                },
333                id="ll_differentif2",
334            ),
335            pytest.param(
336                {
337                    "sip": "2001:db8:a::1",
338                    "dif": "if2",
339                },
340                id="gu_to_ll",
341            ),
342        ],
343    )
344    @pytest.mark.require_user("root")
345    def test_output6_linklocal_scope(self, params):
346        """Tests simple UDP output"""
347        second_vnet = self.vnet_map["vnet2"]
348
349        src_ifp = params.get("sif")
350        src_ip = params.get("sip")
351        dst_ifp = params.get("dif")
352        dst_ip = params.get("dip")
353        errno = params.get("ex", 0)
354
355        # Sent ifp/IP to bind on
356        second_vnet = self.vnet_map["vnet2"]
357        second_vnet.pipe.send((dst_ip, dst_ifp))
358
359        # Wait for the child to become ready
360        ll_data = self.wait_object(second_vnet.pipe)
361
362        if dst_ip is None:
363            # Pick LL address on dst_ifp vnet2's end
364            dst_ip, _ = ll_data[second_vnet.iface_alias_map[dst_ifp].name][0]
365            # Get local interface scope
366            os_ifname = self.vnet.iface_alias_map[dst_ifp].name
367            scopeid = socket.if_nametoindex(os_ifname)
368            target = (dst_ip, self.DEFAULT_PORT, 0, scopeid)
369        else:
370            target = (dst_ip, self.DEFAULT_PORT, 0, 0)
371
372        # Bind
373        if src_ip is None:
374            ll_data = ToolsHelper.get_linklocals()
375            os_ifname = self.vnet.iface_alias_map[src_ifp].name
376            src_ip, _ = ll_data[os_ifname][0]
377            scopeid = socket.if_nametoindex(os_ifname)
378            src = (src_ip, self.DEFAULT_PORT, 0, scopeid)
379        else:
380            src = (src_ip, self.DEFAULT_PORT, 0, 0)
381
382        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
383        s.bind(src)
384        data = bytes("AAAA", "utf-8")
385        print("## TX packet {} -> {}".format(src, target))
386
387        try:
388            s.sendto(data, target)
389        except OSError as e:
390            if not errno:
391                raise
392            assert e.errno == errno
393            print("Correctly raised {}".format(e))
394            return
395
396        # Wait for the received object
397        rx_obj = self.wait_object(second_vnet.pipe)
398        assert rx_obj["dst_ip"] == dst_ip
399        assert rx_obj["src_ip"] == src_ip
400        # assert rx_obj["dst_iface_alias"] == "if2"
401
402
403class TestIP6OutputMulticast(BaseTestIP6Ouput):
404    def vnet2_handler(self, vnet):
405        group = self.wait_object(vnet.pipe)
406        os_ifname = vnet.iface_alias_map["if2"].name
407        self._vnet2_handler(vnet, group, os_ifname)
408
409    @pytest.mark.parametrize("group_scope", ["ff02", "ff05", "ff08", "ff0e"])
410    @pytest.mark.require_user("root")
411    def test_output6_multicast(self, group_scope):
412        """Tests simple UDP output"""
413        second_vnet = self.vnet_map["vnet2"]
414
415        group = "{}::3456".format(group_scope)
416        second_vnet.pipe.send(group)
417
418        # Pick target on if2 vnet2's end
419        ip = group
420        os_ifname = self.vnet.iface_alias_map["if2"].name
421        ifindex = socket.if_nametoindex(os_ifname)
422        optval = struct.pack("I", ifindex)
423
424        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
425        s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, optval)
426
427        data = bytes("AAAA", "utf-8")
428
429        # Wait for the child to become ready
430        self.wait_object(second_vnet.pipe)
431
432        print("## TX packet to {},{}".format(ip, self.DEFAULT_PORT))
433        s.sendto(data, (ip, self.DEFAULT_PORT))
434
435        # Wait for the received object
436        rx_obj = self.wait_object(second_vnet.pipe)
437        assert rx_obj["dst_ip"] == ip
438        assert rx_obj["dst_iface_alias"] == "if2"
439
440
441class TestIP6OutputLoopback(SingleVnetTestTemplate):
442    IPV6_PREFIXES = ["2001:db8:a::1/64"]
443    DEFAULT_PORT = 45365
444
445    @pytest.mark.parametrize(
446        "source_validation",
447        [
448            pytest.param(0, id="no_sav"),
449            pytest.param(1, id="sav"),
450        ],
451    )
452    @pytest.mark.parametrize("scope", ["gu", "ll", "lo"])
453    def test_output6_self_tcp(self, scope, source_validation):
454        """Tests IPv6 TCP connection to the local IPv6 address"""
455
456        ToolsHelper.set_sysctl(
457            "net.inet6.ip6.source_address_validation", source_validation
458        )
459
460        if scope == "gu":
461            ip = "2001:db8:a::1"
462            addr_tuple = (ip, self.DEFAULT_PORT)
463        elif scope == "ll":
464            os_ifname = self.vnet.iface_alias_map["if1"].name
465            ifindex = socket.if_nametoindex(os_ifname)
466            ll_data = ToolsHelper.get_linklocals()
467            ip, _ = ll_data[os_ifname][0]
468            addr_tuple = (ip, self.DEFAULT_PORT, 0, ifindex)
469        elif scope == "lo":
470            ip = "::1"
471            ToolsHelper.get_output("route add -6 ::1/128 -iface lo0")
472            ifindex = socket.if_nametoindex("lo0")
473            addr_tuple = (ip, self.DEFAULT_PORT)
474        else:
475            assert 0 == 1
476        print("address: {}".format(addr_tuple))
477
478        start = time.perf_counter()
479        ss = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
480        ss.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1)
481        ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
482        ss.bind(addr_tuple)
483        ss.listen()
484        s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
485        s.settimeout(2.0)
486        s.connect(addr_tuple)
487        conn, from_addr = ss.accept()
488        duration = time.perf_counter() - start
489
490        assert from_addr[0] == ip
491        assert duration < 1.0
492
493    @pytest.mark.parametrize(
494        "source_validation",
495        [
496            pytest.param(0, id="no_sav"),
497            pytest.param(1, id="sav"),
498        ],
499    )
500    @pytest.mark.parametrize("scope", ["gu", "ll", "lo"])
501    def test_output6_self_udp(self, scope, source_validation):
502        """Tests IPv6 UDP connection to the local IPv6 address"""
503
504        ToolsHelper.set_sysctl(
505            "net.inet6.ip6.source_address_validation", source_validation
506        )
507
508        if scope == "gu":
509            ip = "2001:db8:a::1"
510            addr_tuple = (ip, self.DEFAULT_PORT)
511        elif scope == "ll":
512            os_ifname = self.vnet.iface_alias_map["if1"].name
513            ifindex = socket.if_nametoindex(os_ifname)
514            ll_data = ToolsHelper.get_linklocals()
515            ip, _ = ll_data[os_ifname][0]
516            addr_tuple = (ip, self.DEFAULT_PORT, 0, ifindex)
517        elif scope == "lo":
518            ip = "::1"
519            ToolsHelper.get_output("route add -6 ::1/128 -iface lo0")
520            ifindex = socket.if_nametoindex("lo0")
521            addr_tuple = (ip, self.DEFAULT_PORT)
522        else:
523            assert 0 == 1
524        print("address: {}".format(addr_tuple))
525
526        start = time.perf_counter()
527        ss = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
528        ss.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1)
529        ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
530        ss.bind(addr_tuple)
531        ss.listen()
532        s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
533        s.settimeout(2.0)
534        s.connect(addr_tuple)
535        conn, from_addr = ss.accept()
536        duration = time.perf_counter() - start
537
538        assert from_addr[0] == ip
539        assert duration < 1.0
540