xref: /freebsd/tests/sys/netpfil/pf/nat64.py (revision dd21556857e8d40f66bf5ad54754d9d52669ebf7)
1#
2# SPDX-License-Identifier: BSD-2-Clause
3#
4# Copyright (c) 2024 Rubicon Communications, LLC (Netgate)
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions
8# are met:
9# 1. Redistributions of source code must retain the above copyright
10#    notice, this list of conditions and the following disclaimer.
11# 2. Redistributions in binary form must reproduce the above copyright
12#    notice, this list of conditions and the following disclaimer in the
13#    documentation and/or other materials provided with the distribution.
14#
15# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25# SUCH DAMAGE.
26
27import pytest
28import selectors
29import socket
30import sys
31from atf_python.sys.net.tools import ToolsHelper
32from atf_python.sys.net.vnet import VnetTestTemplate
33
34class TestNAT64(VnetTestTemplate):
35    REQUIRED_MODULES = [ "pf" ]
36    TOPOLOGY = {
37        "vnet1": {"ifaces": ["if1"]},
38        "vnet2": {"ifaces": ["if1", "if2"]},
39        "vnet3": {"ifaces": ["if2"]},
40        "if1": {"prefixes6": [("2001:db8::2/64", "2001:db8::1/64")]},
41        "if2": {"prefixes4": [("192.0.2.1/24", "192.0.2.2/24")]},
42    }
43
44    def vnet3_handler(self, vnet):
45        ToolsHelper.print_output("/sbin/sysctl net.inet.ip.forwarding=1")
46        ToolsHelper.print_output("/sbin/sysctl net.inet.ip.ttl=62")
47        ToolsHelper.print_output("/sbin/sysctl net.inet.udp.checksum=0")
48
49        sel = selectors.DefaultSelector()
50        t = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
51        t.bind(("0.0.0.0", 1234))
52        t.setblocking(False)
53        t.listen()
54        sel.register(t, selectors.EVENT_READ, data=None)
55
56        u = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
57        u.bind(("0.0.0.0", 4444))
58        u.setblocking(False)
59        sel.register(u, selectors.EVENT_READ, data="UDP")
60
61        while True:
62            events = sel.select(timeout=20)
63            for key, mask in events:
64                sock = key.fileobj
65                if key.data is None:
66                    conn, addr = sock.accept()
67                    print(f"Accepted connection from {addr}")
68                    data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"")
69                    events = selectors.EVENT_READ | selectors.EVENT_WRITE
70                    sel.register(conn, events, data=data)
71                elif key.data == "UDP":
72                    recv_data, addr = sock.recvfrom(1024)
73                    print(f"Received UDP {recv_data} from {addr}")
74                    sock.sendto(b"foo", addr)
75                else:
76                    if mask & selectors.EVENT_READ:
77                        recv_data = sock.recv(1024)
78                        print(f"Received TCP {recv_data}")
79                        sock.send(b"foo")
80                    else:
81                        print("Unknown event?")
82                        t.close()
83                        u.close()
84                        return
85
86    def vnet2_handler(self, vnet):
87        ifname = vnet.iface_alias_map["if1"].name
88
89        ToolsHelper.print_output("/sbin/route add default 192.0.2.2")
90        ToolsHelper.print_output("/sbin/pfctl -e")
91        ToolsHelper.pf_rules([
92            "pass inet6 proto icmp6",
93            "pass in on %s inet6 af-to inet from 192.0.2.1" % ifname])
94
95    @pytest.mark.require_user("root")
96    def test_tcp_rst(self):
97        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
98
99        import scapy.all as sp
100
101        # Send a SYN
102        packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \
103            / sp.TCP(dport=1222, flags="S")
104
105        # Get a reply
106        reply = sp.sr1(packet)
107
108        # We expect to get a RST here.
109        tcp = reply.getlayer(sp.TCP)
110        assert tcp
111        assert "R" in tcp.flags
112
113        # Now try to SYN to an open port
114        packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \
115            / sp.TCP(dport=1234, flags="S")
116        reply = sp.sr1(packet)
117
118        tcp = reply.getlayer(sp.TCP)
119        assert tcp
120
121        # We don't get RST
122        assert "R" not in tcp.flags
123
124        # We do get SYN|ACK
125        assert "S" in tcp.flags
126        assert "A" in tcp.flags
127
128    @pytest.mark.require_user("root")
129    def test_udp_port_closed(self):
130        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
131
132        import scapy.all as sp
133
134        packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \
135            / sp.UDP(dport=1222) / sp.Raw("bar")
136        reply = sp.sr1(packet, timeout=3)
137        print(reply.show())
138
139        # We expect an ICMPv6 error, not a UDP reply
140        assert not reply.getlayer(sp.UDP)
141        icmp = reply.getlayer(sp.ICMPv6DestUnreach)
142        assert icmp
143        assert icmp.type == 1
144        assert icmp.code == 4
145        udp = reply.getlayer(sp.UDPerror)
146        assert udp
147        assert udp.dport == 1222
148
149    @pytest.mark.require_user("root")
150    def test_address_unreachable(self):
151        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
152
153        import scapy.all as sp
154
155        packet = sp.IPv6(dst="64:ff9b::198.51.100.3") \
156            / sp.UDP(dport=1222) / sp.Raw("bar")
157        reply = sp.sr1(packet, timeout=3)
158        print(reply.show())
159
160        # We expect an ICMPv6 error, not a UDP reply
161        assert not reply.getlayer(sp.UDP)
162        icmp = reply.getlayer(sp.ICMPv6DestUnreach)
163        assert icmp
164        assert icmp.type == 1
165        assert icmp.code == 0
166        udp = reply.getlayer(sp.UDPerror)
167        assert udp
168        assert udp.dport == 1222
169
170        # Check the hop limit
171        ip6 = reply.getlayer(sp.IPv6)
172        assert ip6.hlim == 62
173
174    @pytest.mark.require_user("root")
175    def test_udp_checksum(self):
176        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
177
178        import scapy.all as sp
179
180        # Send an outbound UDP packet to establish state
181        packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \
182            / sp.UDP(sport=3333, dport=4444) / sp.Raw("foo")
183
184        # Get a reply
185        # We'll send the reply without UDP checksum on the IPv4 side
186        # but that's not valid for IPv6, so expect pf to update the checksum.
187        reply = sp.sr1(packet, timeout=5)
188
189        udp = reply.getlayer(sp.UDP)
190        assert udp
191        assert udp.chksum != 0
192