1# 2# SPDX-License-Identifier: BSD-2-Clause 3# 4# Copyright (c) 2024 Rubicon Communications, LLC (Netgate) 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions 8# are met: 9# 1. Redistributions of source code must retain the above copyright 10# notice, this list of conditions and the following disclaimer. 11# 2. Redistributions in binary form must reproduce the above copyright 12# notice, this list of conditions and the following disclaimer in the 13# documentation and/or other materials provided with the distribution. 14# 15# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25# SUCH DAMAGE. 26 27import ctypes 28import ipaddress 29import pytest 30import re 31import socket 32from utils import DelayedSend 33from atf_python.sys.net.tools import ToolsHelper 34from atf_python.sys.net.vnet import VnetTestTemplate 35 36class TestNAT66(VnetTestTemplate): 37 REQUIRED_MODULES = [ "pf" ] 38 TOPOLOGY = { 39 "vnet1": {"ifaces": ["if1"]}, 40 "vnet2": {"ifaces": ["if1", "if2"]}, 41 "vnet3": {"ifaces": ["if2"]}, 42 "if1": {"prefixes6": [("2001:db8::2/64", "2001:db8::1/64")]}, 43 "if2": {"prefixes6": [("2001:db8:1::1/64", "2001:db8:1::2/64")]}, 44 } 45 46 def vnet2_handler(self, vnet): 47 ifname = vnet.iface_alias_map["if1"].name 48 ToolsHelper.print_output("/sbin/ifconfig %s mtu 9000" % ifname) 49 outifname = vnet.iface_alias_map["if2"].name 50 51 ToolsHelper.print_output("/sbin/pfctl -e") 52 ToolsHelper.pf_rules([ 53 "set reassemble yes", 54 "binat inet6 from 2001:db8::/64 to 2001:db8:1::/64 -> 2001:db8:42::/64", 55 "binat inet6 from 2001:db8:1::/64 to 2001:db8:42::/64 -> 2001:db8::/64", 56 "pass inet6 proto icmp6", 57 "pass in route-to ( %s 2001:db8:1::2 ) inet6 from 2001:db8::3 to 2001:db8:1::/64" % outifname]) 58 59 ToolsHelper.print_output("/sbin/sysctl net.inet6.ip6.forwarding=1") 60 61 def vnet3_handler(self, vnet): 62 ToolsHelper.print_output("/sbin/route add -6 2001:db8:42::/64 2001:db8:1::1") 63 64 def check_icmp_too_big(self, sp, payload_size, frag_size=None, src="2001:db8::2"): 65 packet = sp.IPv6(src=src, dst="2001:db8:1::2") \ 66 / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f0') * payload_size)) 67 68 if frag_size is not None: 69 packet = sp.fragment6(packet, frag_size) 70 71 # Delay the send so the sniffer is running when we transmit. 72 s = DelayedSend(packet) 73 74 packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name, 75 timeout=3) 76 found=False 77 for p in packets: 78 # We can't get a reply to this 79 assert not p.getlayer(sp.ICMPv6EchoReply) 80 81 if not p.getlayer(sp.ICMPv6PacketTooBig): 82 continue 83 84 ip6 = p.getlayer(sp.IPv6) 85 icmp6 = p.getlayer(sp.ICMPv6PacketTooBig) 86 87 # Error is from the router vnet 88 assert ip6.src == "2001:db8::1" 89 assert ip6.dst == src 90 91 # And the relevant MTU is 1500 92 assert icmp6.mtu == 1500 93 94 # The icmp6 error contains our original IPv6 packet 95 err = icmp6.getlayer(sp.IPerror6) 96 assert err.src == src 97 assert err.dst == "2001:db8:1::2" 98 assert err.nh == 58 99 100 found = True 101 102 assert found 103 104 def check_icmp_echo(self, sp, payload_size, src="2001:db8::2"): 105 packet = sp.IPv6(src=src, dst="2001:db8:1::2") \ 106 / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f0') * payload_size)) 107 108 # Delay the send so the sniffer is running when we transmit. 109 s = DelayedSend(packet) 110 111 packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name, 112 timeout=3) 113 found=False 114 for p in packets: 115 if not p.getlayer(sp.ICMPv6EchoReply): 116 continue 117 118 ip6 = p.getlayer(sp.IPv6) 119 icmp6 = p.getlayer(sp.ICMPv6EchoReply) 120 121 # Error is from the router vnet 122 assert ip6.src == "2001:db8:1::2" 123 assert ip6.dst == src 124 125 found = True 126 127 assert found 128 129 @pytest.mark.require_user("root") 130 @pytest.mark.require_progs(["scapy"]) 131 def test_npt_icmp(self): 132 cl_vnet = self.vnet_map["vnet1"] 133 ifname = cl_vnet.iface_alias_map["if1"].name 134 ToolsHelper.print_output("/sbin/ifconfig %s mtu 9000" % ifname) 135 136 ToolsHelper.print_output("/sbin/route add -6 2001:db8:1::/64 2001:db8::1") 137 138 # For unclear reasons vnet3 doesn't respond to the first ping. 139 # Just send two for now. 140 ToolsHelper.print_output("/sbin/ping -6 -c 1 2001:db8:1::2") 141 ToolsHelper.print_output("/sbin/ping -6 -c 1 2001:db8:1::2") 142 143 # Import in the correct vnet, so at to not confuse Scapy 144 import scapy.all as sp 145 146 # A ping that easily passes without fragmentation 147 self.check_icmp_echo(sp, 128) 148 149 # Send a ping that just barely doesn't need to be fragmented 150 self.check_icmp_echo(sp, 1452) 151 152 # Send a ping that just barely needs to be fragmented 153 self.check_icmp_too_big(sp, 1453) 154 155 # A ping that arrives fragmented 156 self.check_icmp_too_big(sp, 12000, 5000) 157 158 @pytest.mark.require_user("root") 159 @pytest.mark.require_progs(["scapy"]) 160 def test_npt_route_to_icmp(self): 161 cl_vnet = self.vnet_map["vnet1"] 162 ifname = cl_vnet.iface_alias_map["if1"].name 163 ToolsHelper.print_output("/sbin/ifconfig %s mtu 9000" % ifname) 164 ToolsHelper.print_output("/sbin/ifconfig %s inet6 alias 2001:db8::3/64" % ifname) 165 166 ToolsHelper.print_output("/sbin/route add -6 2001:db8:1::/64 2001:db8::1") 167 168 # For unclear reasons vnet3 doesn't respond to the first ping. 169 # Just send two for now. 170 ToolsHelper.print_output("/sbin/ping -6 -c 1 -S 2001:db8::3 2001:db8:1::2") 171 ToolsHelper.print_output("/sbin/ping -6 -c 1 -S 2001:db8::3 2001:db8:1::2") 172 173 # Import in the correct vnet, so at to not confuse Scapy 174 import scapy.all as sp 175 176 # A ping that easily passes without fragmentation 177 self.check_icmp_echo(sp, 128, src="2001:db8::3") 178 179 # Send a ping that just barely doesn't need to be fragmented 180 self.check_icmp_echo(sp, 1452, src="2001:db8::3") 181 182 # Send a ping that just barely needs to be fragmented 183 self.check_icmp_too_big(sp, 1453, src="2001:db8::3") 184 185 # A ping that arrives fragmented 186 self.check_icmp_too_big(sp, 12000, 5000, src="2001:db8::3") 187