xref: /freebsd/tests/sys/netpfil/pf/nat64.py (revision 6e7f24e0a5262d7e040f4f6e9167c544e006176d)
1#
2# SPDX-License-Identifier: BSD-2-Clause
3#
4# Copyright (c) 2024 Rubicon Communications, LLC (Netgate)
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions
8# are met:
9# 1. Redistributions of source code must retain the above copyright
10#    notice, this list of conditions and the following disclaimer.
11# 2. Redistributions in binary form must reproduce the above copyright
12#    notice, this list of conditions and the following disclaimer in the
13#    documentation and/or other materials provided with the distribution.
14#
15# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25# SUCH DAMAGE.
26
27import pytest
28import selectors
29import socket
30import sys
31import threading
32import time
33from atf_python.sys.net.tools import ToolsHelper
34from atf_python.sys.net.vnet import VnetTestTemplate
35
36class DelayedSend(threading.Thread):
37    def __init__(self, packet):
38        threading.Thread.__init__(self)
39        self._packet = packet
40
41        self.start()
42
43    def run(self):
44        import scapy.all as sp
45        time.sleep(1)
46        sp.send(self._packet)
47
48class TestNAT64(VnetTestTemplate):
49    REQUIRED_MODULES = [ "pf" ]
50    TOPOLOGY = {
51        "vnet1": {"ifaces": ["if1"]},
52        "vnet2": {"ifaces": ["if1", "if2"]},
53        "vnet3": {"ifaces": ["if2", "if3"]},
54        "vnet4": {"ifaces": ["if3"]},
55        "if1": {"prefixes6": [("2001:db8::2/64", "2001:db8::1/64")]},
56        "if2": {"prefixes4": [("192.0.2.1/24", "192.0.2.2/24")]},
57        "if3": {"prefixes4": [("198.51.100.1/24", "198.51.100.2/24")]}
58    }
59
60    def vnet4_handler(self, vnet):
61        ToolsHelper.print_output("/sbin/route add default 198.51.100.1")
62
63    def vnet3_handler(self, vnet):
64        ToolsHelper.print_output("/sbin/sysctl net.inet.ip.forwarding=1")
65        ToolsHelper.print_output("/sbin/sysctl net.inet.ip.ttl=62")
66        ToolsHelper.print_output("/sbin/sysctl net.inet.udp.checksum=0")
67
68        sel = selectors.DefaultSelector()
69        t = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
70        t.bind(("0.0.0.0", 1234))
71        t.setblocking(False)
72        t.listen()
73        sel.register(t, selectors.EVENT_READ, data=None)
74
75        u = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
76        u.bind(("0.0.0.0", 4444))
77        u.setblocking(False)
78        sel.register(u, selectors.EVENT_READ, data="UDP")
79
80        while True:
81            events = sel.select(timeout=20)
82            for key, mask in events:
83                sock = key.fileobj
84                if key.data is None:
85                    conn, addr = sock.accept()
86                    print(f"Accepted connection from {addr}")
87                    data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"")
88                    events = selectors.EVENT_READ | selectors.EVENT_WRITE
89                    sel.register(conn, events, data=data)
90                elif key.data == "UDP":
91                    recv_data, addr = sock.recvfrom(1024)
92                    print(f"Received UDP {recv_data} from {addr}")
93                    sock.sendto(b"foo", addr)
94                else:
95                    if mask & selectors.EVENT_READ:
96                        recv_data = sock.recv(1024)
97                        print(f"Received TCP {recv_data}")
98                        sock.send(b"foo")
99                    else:
100                        print("Unknown event?")
101                        t.close()
102                        u.close()
103                        return
104
105    def vnet2_handler(self, vnet):
106        ifname = vnet.iface_alias_map["if1"].name
107
108        ToolsHelper.print_output("/sbin/route add default 192.0.2.2")
109        ToolsHelper.print_output("/sbin/pfctl -e")
110        ToolsHelper.pf_rules([
111            "pass inet6 proto icmp6",
112            "pass in on %s inet6 af-to inet from 192.0.2.1" % ifname])
113
114    @pytest.mark.require_user("root")
115    @pytest.mark.require_progs(["scapy"])
116    def test_tcp_rst(self):
117        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
118
119        import scapy.all as sp
120
121        # Send a SYN
122        packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \
123            / sp.TCP(dport=1222, flags="S")
124
125        # Get a reply
126        reply = sp.sr1(packet)
127
128        # We expect to get a RST here.
129        tcp = reply.getlayer(sp.TCP)
130        assert tcp
131        assert "R" in tcp.flags
132
133        # Now try to SYN to an open port
134        packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \
135            / sp.TCP(dport=1234, flags="S")
136        reply = sp.sr1(packet)
137
138        tcp = reply.getlayer(sp.TCP)
139        assert tcp
140
141        # We don't get RST
142        assert "R" not in tcp.flags
143
144        # We do get SYN|ACK
145        assert "S" in tcp.flags
146        assert "A" in tcp.flags
147
148    @pytest.mark.require_user("root")
149    @pytest.mark.require_progs(["scapy"])
150    def test_udp_port_closed(self):
151        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
152
153        import scapy.all as sp
154
155        packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \
156            / sp.UDP(dport=1222) / sp.Raw("bar")
157        reply = sp.sr1(packet, timeout=3)
158        print(reply.show())
159
160        # We expect an ICMPv6 error, not a UDP reply
161        assert not reply.getlayer(sp.UDP)
162        icmp = reply.getlayer(sp.ICMPv6DestUnreach)
163        assert icmp
164        assert icmp.type == 1
165        assert icmp.code == 4
166        udp = reply.getlayer(sp.UDPerror)
167        assert udp
168        assert udp.dport == 1222
169
170    @pytest.mark.require_user("root")
171    @pytest.mark.require_progs(["scapy"])
172    def test_address_unreachable(self):
173        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
174
175        import scapy.all as sp
176
177        packet = sp.IPv6(dst="64:ff9b::203.0.113.2") \
178            / sp.UDP(dport=1222) / sp.Raw("bar")
179        reply = sp.sr1(packet, timeout=3)
180        print(reply.show())
181
182        # We expect an ICMPv6 error, not a UDP reply
183        assert not reply.getlayer(sp.UDP)
184        icmp = reply.getlayer(sp.ICMPv6DestUnreach)
185        assert icmp
186        assert icmp.type == 1
187        assert icmp.code == 0
188        udp = reply.getlayer(sp.UDPerror)
189        assert udp
190        assert udp.dport == 1222
191
192        # Check the hop limit
193        ip6 = reply.getlayer(sp.IPv6)
194        assert ip6.hlim == 62
195
196    @pytest.mark.require_user("root")
197    @pytest.mark.require_progs(["scapy"])
198    def test_udp_checksum(self):
199        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
200
201        import scapy.all as sp
202
203        # Send an outbound UDP packet to establish state
204        packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \
205            / sp.UDP(sport=3333, dport=4444) / sp.Raw("foo")
206
207        # Get a reply
208        # We'll send the reply without UDP checksum on the IPv4 side
209        # but that's not valid for IPv6, so expect pf to update the checksum.
210        reply = sp.sr1(packet, timeout=5)
211
212        udp = reply.getlayer(sp.UDP)
213        assert udp
214        assert udp.chksum != 0
215
216    def common_test_source_addr(self, packet):
217        vnet = self.vnet_map["vnet1"]
218        sendif = vnet.iface_alias_map["if1"].name
219
220        import scapy.all as sp
221
222        print("Outbound:\n")
223        packet.show()
224
225        s = DelayedSend(packet)
226
227        # We expect an ICMPv6 error here, where we'll verify the source address of
228        # the outer packet
229        packets = sp.sniff(iface=sendif, timeout=5)
230
231        for reply in packets:
232            print("Reply:\n")
233            reply.show()
234            icmp = reply.getlayer(sp.ICMPv6TimeExceeded)
235            if not icmp:
236                continue
237
238            ip = reply.getlayer(sp.IPv6)
239            assert icmp
240            assert ip.src == "64:ff9b::c000:202"
241            return reply
242
243        # If we don't find the packet we expect to see
244        assert False
245
246    @pytest.mark.require_user("root")
247    @pytest.mark.require_progs(["scapy"])
248    def test_source_addr_tcp(self):
249        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
250        import scapy.all as sp
251
252        packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=1) \
253            / sp.TCP(sport=1111, dport=2222, flags="S")
254        self.common_test_source_addr(packet)
255
256    @pytest.mark.require_user("root")
257    @pytest.mark.require_progs(["scapy"])
258    def test_source_addr_udp(self):
259        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
260        import scapy.all as sp
261
262        packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=1) \
263            / sp.UDP(sport=1111, dport=2222) / sp.Raw("foo")
264        self.common_test_source_addr(packet)
265
266    @pytest.mark.require_user("root")
267    @pytest.mark.require_progs(["scapy"])
268    def test_source_addr_sctp(self):
269        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
270        import scapy.all as sp
271
272        packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=1) \
273            / sp.SCTP(sport=1111, dport=2222) \
274            / sp.SCTPChunkInit(init_tag=1, n_in_streams=1, n_out_streams=1, a_rwnd=1500)
275        self.common_test_source_addr(packet)
276
277    @pytest.mark.require_user("root")
278    @pytest.mark.require_progs(["scapy"])
279    def test_source_addr_icmp(self):
280        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
281        import scapy.all as sp
282
283        packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=1) \
284            / sp.ICMPv6EchoRequest() / sp.Raw("foo")
285        reply = self.common_test_source_addr(packet)
286        icmp = reply.getlayer(sp.ICMPv6EchoRequest)
287        assert icmp
288