xref: /freebsd/tests/sys/netpfil/pf/nat64.py (revision 97ca2ada80b870edbbb4f66b26e274cf8e55e0bc)
1#
2# SPDX-License-Identifier: BSD-2-Clause
3#
4# Copyright (c) 2024 Rubicon Communications, LLC (Netgate)
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions
8# are met:
9# 1. Redistributions of source code must retain the above copyright
10#    notice, this list of conditions and the following disclaimer.
11# 2. Redistributions in binary form must reproduce the above copyright
12#    notice, this list of conditions and the following disclaimer in the
13#    documentation and/or other materials provided with the distribution.
14#
15# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25# SUCH DAMAGE.
26
27import pytest
28import selectors
29import socket
30import sys
31from atf_python.sys.net.tools import ToolsHelper
32from atf_python.sys.net.vnet import VnetTestTemplate
33
34class TestNAT64(VnetTestTemplate):
35    REQUIRED_MODULES = [ "pf" ]
36    TOPOLOGY = {
37        "vnet1": {"ifaces": ["if1"]},
38        "vnet2": {"ifaces": ["if1", "if2"]},
39        "vnet3": {"ifaces": ["if2"]},
40        "if1": {"prefixes6": [("2001:db8::2/64", "2001:db8::1/64")]},
41        "if2": {"prefixes4": [("192.0.2.1/24", "192.0.2.2/24")]},
42    }
43
44    def vnet3_handler(self, vnet):
45        ToolsHelper.print_output("/sbin/sysctl net.inet.ip.forwarding=1")
46        ToolsHelper.print_output("/sbin/sysctl net.inet.ip.ttl=62")
47        ToolsHelper.print_output("/sbin/sysctl net.inet.udp.checksum=0")
48
49        sel = selectors.DefaultSelector()
50        t = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
51        t.bind(("0.0.0.0", 1234))
52        t.setblocking(False)
53        t.listen()
54        sel.register(t, selectors.EVENT_READ, data=None)
55
56        u = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
57        u.bind(("0.0.0.0", 4444))
58        u.setblocking(False)
59        sel.register(u, selectors.EVENT_READ, data="UDP")
60
61        while True:
62            events = sel.select(timeout=20)
63            for key, mask in events:
64                sock = key.fileobj
65                if key.data is None:
66                    conn, addr = sock.accept()
67                    print(f"Accepted connection from {addr}")
68                    data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"")
69                    events = selectors.EVENT_READ | selectors.EVENT_WRITE
70                    sel.register(conn, events, data=data)
71                elif key.data == "UDP":
72                    recv_data, addr = sock.recvfrom(1024)
73                    print(f"Received UDP {recv_data} from {addr}")
74                    sock.sendto(b"foo", addr)
75                else:
76                    if mask & selectors.EVENT_READ:
77                        recv_data = sock.recv(1024)
78                        print(f"Received TCP {recv_data}")
79                        sock.send(b"foo")
80                    else:
81                        print("Unknown event?")
82                        t.close()
83                        u.close()
84                        return
85
86    def vnet2_handler(self, vnet):
87        ifname = vnet.iface_alias_map["if1"].name
88
89        ToolsHelper.print_output("/sbin/route add default 192.0.2.2")
90        ToolsHelper.print_output("/sbin/pfctl -e")
91        ToolsHelper.pf_rules([
92            "pass inet6 proto icmp6",
93            "pass in on %s inet6 af-to inet from 192.0.2.1" % ifname])
94
95    @pytest.mark.require_user("root")
96    @pytest.mark.require_progs(["scapy"])
97    def test_tcp_rst(self):
98        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
99
100        import scapy.all as sp
101
102        # Send a SYN
103        packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \
104            / sp.TCP(dport=1222, flags="S")
105
106        # Get a reply
107        reply = sp.sr1(packet)
108
109        # We expect to get a RST here.
110        tcp = reply.getlayer(sp.TCP)
111        assert tcp
112        assert "R" in tcp.flags
113
114        # Now try to SYN to an open port
115        packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \
116            / sp.TCP(dport=1234, flags="S")
117        reply = sp.sr1(packet)
118
119        tcp = reply.getlayer(sp.TCP)
120        assert tcp
121
122        # We don't get RST
123        assert "R" not in tcp.flags
124
125        # We do get SYN|ACK
126        assert "S" in tcp.flags
127        assert "A" in tcp.flags
128
129    @pytest.mark.require_user("root")
130    @pytest.mark.require_progs(["scapy"])
131    def test_udp_port_closed(self):
132        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
133
134        import scapy.all as sp
135
136        packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \
137            / sp.UDP(dport=1222) / sp.Raw("bar")
138        reply = sp.sr1(packet, timeout=3)
139        print(reply.show())
140
141        # We expect an ICMPv6 error, not a UDP reply
142        assert not reply.getlayer(sp.UDP)
143        icmp = reply.getlayer(sp.ICMPv6DestUnreach)
144        assert icmp
145        assert icmp.type == 1
146        assert icmp.code == 4
147        udp = reply.getlayer(sp.UDPerror)
148        assert udp
149        assert udp.dport == 1222
150
151    @pytest.mark.require_user("root")
152    @pytest.mark.require_progs(["scapy"])
153    def test_address_unreachable(self):
154        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
155
156        import scapy.all as sp
157
158        packet = sp.IPv6(dst="64:ff9b::198.51.100.3") \
159            / sp.UDP(dport=1222) / sp.Raw("bar")
160        reply = sp.sr1(packet, timeout=3)
161        print(reply.show())
162
163        # We expect an ICMPv6 error, not a UDP reply
164        assert not reply.getlayer(sp.UDP)
165        icmp = reply.getlayer(sp.ICMPv6DestUnreach)
166        assert icmp
167        assert icmp.type == 1
168        assert icmp.code == 0
169        udp = reply.getlayer(sp.UDPerror)
170        assert udp
171        assert udp.dport == 1222
172
173        # Check the hop limit
174        ip6 = reply.getlayer(sp.IPv6)
175        assert ip6.hlim == 62
176
177    @pytest.mark.require_user("root")
178    @pytest.mark.require_progs(["scapy"])
179    def test_udp_checksum(self):
180        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
181
182        import scapy.all as sp
183
184        # Send an outbound UDP packet to establish state
185        packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \
186            / sp.UDP(sport=3333, dport=4444) / sp.Raw("foo")
187
188        # Get a reply
189        # We'll send the reply without UDP checksum on the IPv4 side
190        # but that's not valid for IPv6, so expect pf to update the checksum.
191        reply = sp.sr1(packet, timeout=5)
192
193        udp = reply.getlayer(sp.UDP)
194        assert udp
195        assert udp.chksum != 0
196