xref: /freebsd/tests/sys/netpfil/pf/nat66.py (revision 4f35a84b32412f5cf54e08cd97cd6eee407fb30e)
1#
2# SPDX-License-Identifier: BSD-2-Clause
3#
4# Copyright (c) 2024 Rubicon Communications, LLC (Netgate)
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions
8# are met:
9# 1. Redistributions of source code must retain the above copyright
10#    notice, this list of conditions and the following disclaimer.
11# 2. Redistributions in binary form must reproduce the above copyright
12#    notice, this list of conditions and the following disclaimer in the
13#    documentation and/or other materials provided with the distribution.
14#
15# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25# SUCH DAMAGE.
26
27import ctypes
28import ipaddress
29import pytest
30import re
31import socket
32from utils import DelayedSend
33from atf_python.sys.net.tools import ToolsHelper
34from atf_python.sys.net.vnet import VnetTestTemplate
35
36class TestNAT66(VnetTestTemplate):
37    REQUIRED_MODULES = [ "pf" ]
38    TOPOLOGY = {
39        "vnet1": {"ifaces": ["if1"]},
40        "vnet2": {"ifaces": ["if1", "if2"]},
41        "vnet3": {"ifaces": ["if2"]},
42        "if1": {"prefixes6": [("2001:db8::2/64", "2001:db8::1/64")], "mtu": 9000},
43        "if2": {"prefixes6": [("2001:db8:1::1/64", "2001:db8:1::2/64")]},
44    }
45
46    def vnet2_handler(self, vnet):
47        outifname = vnet.iface_alias_map["if2"].name
48
49        ToolsHelper.print_output("/sbin/pfctl -e")
50        ToolsHelper.pf_rules([
51            "set reassemble yes",
52            "binat inet6 from 2001:db8::/64 to 2001:db8:1::/64 -> 2001:db8:42::/64",
53            "binat inet6 from 2001:db8:1::/64 to 2001:db8:42::/64 -> 2001:db8::/64",
54            "pass inet6 proto icmp6",
55            "pass in route-to ( %s 2001:db8:1::2 ) inet6 from 2001:db8::3 to 2001:db8:1::/64" % outifname])
56
57        ToolsHelper.print_output("/sbin/sysctl net.inet6.ip6.forwarding=1")
58
59    def vnet3_handler(self, vnet):
60        ToolsHelper.print_output("/sbin/route add -6 2001:db8:42::/64 2001:db8:1::1")
61
62    def check_icmp_too_big(self, sp, payload_size, frag_size=None, src="2001:db8::2"):
63        packet = sp.IPv6(src=src, dst="2001:db8:1::2") \
64            / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f0') * payload_size))
65
66        if frag_size is not None:
67            packet = sp.fragment6(packet, frag_size)
68
69        # Delay the send so the sniffer is running when we transmit.
70        s = DelayedSend(packet)
71
72        packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name,
73            timeout=3)
74        found=False
75        for p in packets:
76            # We can't get a reply to this
77            assert not p.getlayer(sp.ICMPv6EchoReply)
78
79            if not p.getlayer(sp.ICMPv6PacketTooBig):
80                continue
81
82            ip6 = p.getlayer(sp.IPv6)
83            icmp6 = p.getlayer(sp.ICMPv6PacketTooBig)
84
85            # Error is from the router vnet
86            assert ip6.src == "2001:db8::1"
87            assert ip6.dst == src
88
89            # And the relevant MTU is 1500
90            assert icmp6.mtu == 1500
91
92            # The icmp6 error contains our original IPv6 packet
93            err = icmp6.getlayer(sp.IPerror6)
94            assert err.src == src
95            assert err.dst == "2001:db8:1::2"
96            assert err.nh == 58
97
98            found = True
99
100        assert found
101
102    def check_icmp_echo(self, sp, payload_size, src="2001:db8::2"):
103        packet = sp.IPv6(src=src, dst="2001:db8:1::2") \
104            / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f0') * payload_size))
105
106        # Delay the send so the sniffer is running when we transmit.
107        s = DelayedSend(packet)
108
109        packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name,
110            timeout=3)
111        found=False
112        for p in packets:
113            if not p.getlayer(sp.ICMPv6EchoReply):
114                continue
115
116            ip6 = p.getlayer(sp.IPv6)
117            icmp6 = p.getlayer(sp.ICMPv6EchoReply)
118
119            # Error is from the router vnet
120            assert ip6.src == "2001:db8:1::2"
121            assert ip6.dst == src
122
123            found = True
124
125        assert found
126
127    @pytest.mark.require_user("root")
128    @pytest.mark.require_progs(["scapy"])
129    def test_npt_icmp(self):
130        cl_vnet = self.vnet_map["vnet1"]
131
132        ToolsHelper.print_output("/sbin/route add -6 2001:db8:1::/64 2001:db8::1")
133
134        # For unclear reasons vnet3 doesn't respond to the first ping.
135        # Just send two for now.
136        ToolsHelper.print_output("/sbin/ping -6 -c 1 2001:db8:1::2")
137        ToolsHelper.print_output("/sbin/ping -6 -c 1 2001:db8:1::2")
138
139        # Import in the correct vnet, so at to not confuse Scapy
140        import scapy.all as sp
141
142        # A ping that easily passes without fragmentation
143        self.check_icmp_echo(sp, 128)
144
145        # Send a ping that just barely doesn't need to be fragmented
146        self.check_icmp_echo(sp, 1452)
147
148        # Send a ping that just barely needs to be fragmented
149        self.check_icmp_too_big(sp, 1453)
150
151        # A ping that arrives fragmented
152        self.check_icmp_too_big(sp, 12000, 5000)
153
154    @pytest.mark.require_user("root")
155    @pytest.mark.require_progs(["scapy"])
156    def test_npt_route_to_icmp(self):
157        cl_vnet = self.vnet_map["vnet1"]
158        ifname = cl_vnet.iface_alias_map["if1"].name
159        ToolsHelper.print_output("/sbin/ifconfig %s inet6 alias 2001:db8::3/64" % ifname)
160
161        ToolsHelper.print_output("/sbin/route add -6 2001:db8:1::/64 2001:db8::1")
162
163        # For unclear reasons vnet3 doesn't respond to the first ping.
164        # Just send two for now.
165        ToolsHelper.print_output("/sbin/ping -6 -c 1 -S 2001:db8::3 2001:db8:1::2")
166        ToolsHelper.print_output("/sbin/ping -6 -c 1 -S 2001:db8::3 2001:db8:1::2")
167
168        # Import in the correct vnet, so at to not confuse Scapy
169        import scapy.all as sp
170
171        # A ping that easily passes without fragmentation
172        self.check_icmp_echo(sp, 128, src="2001:db8::3")
173
174        # Send a ping that just barely doesn't need to be fragmented
175        self.check_icmp_echo(sp, 1452, src="2001:db8::3")
176
177        # Send a ping that just barely needs to be fragmented
178        self.check_icmp_too_big(sp, 1453, src="2001:db8::3")
179
180        # A ping that arrives fragmented
181        self.check_icmp_too_big(sp, 12000, 5000, src="2001:db8::3")
182