1# 2# SPDX-License-Identifier: BSD-2-Clause 3# 4# Copyright (c) 2024 Rubicon Communications, LLC (Netgate) 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions 8# are met: 9# 1. Redistributions of source code must retain the above copyright 10# notice, this list of conditions and the following disclaimer. 11# 2. Redistributions in binary form must reproduce the above copyright 12# notice, this list of conditions and the following disclaimer in the 13# documentation and/or other materials provided with the distribution. 14# 15# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25# SUCH DAMAGE. 26 27import ctypes 28import ipaddress 29import pytest 30import re 31import socket 32from utils import DelayedSend 33from atf_python.sys.net.tools import ToolsHelper 34from atf_python.sys.net.vnet import VnetTestTemplate 35 36class TestNAT66(VnetTestTemplate): 37 REQUIRED_MODULES = [ "pf" ] 38 TOPOLOGY = { 39 "vnet1": {"ifaces": ["if1"]}, 40 "vnet2": {"ifaces": ["if1", "if2"]}, 41 "vnet3": {"ifaces": ["if2"]}, 42 "if1": {"prefixes6": [("2001:db8::2/64", "2001:db8::1/64")], "mtu": 9000}, 43 "if2": {"prefixes6": [("2001:db8:1::1/64", "2001:db8:1::2/64")]}, 44 } 45 46 def vnet2_handler(self, vnet): 47 outifname = vnet.iface_alias_map["if2"].name 48 49 ToolsHelper.print_output("/sbin/pfctl -e") 50 ToolsHelper.pf_rules([ 51 "set reassemble yes", 52 "binat inet6 from 2001:db8::/64 to 2001:db8:1::/64 -> 2001:db8:42::/64", 53 "binat inet6 from 2001:db8:1::/64 to 2001:db8:42::/64 -> 2001:db8::/64", 54 "pass inet6 proto icmp6", 55 "pass in route-to ( %s 2001:db8:1::2 ) inet6 from 2001:db8::3 to 2001:db8:1::/64" % outifname]) 56 57 ToolsHelper.print_output("/sbin/sysctl net.inet6.ip6.forwarding=1") 58 59 def vnet3_handler(self, vnet): 60 ToolsHelper.print_output("/sbin/route add -6 2001:db8:42::/64 2001:db8:1::1") 61 62 def check_icmp_too_big(self, sp, payload_size, frag_size=None, src="2001:db8::2"): 63 packet = sp.IPv6(src=src, dst="2001:db8:1::2") \ 64 / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f0') * payload_size)) 65 66 if frag_size is not None: 67 packet = sp.fragment6(packet, frag_size) 68 69 # Delay the send so the sniffer is running when we transmit. 70 s = DelayedSend(packet) 71 72 packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name, 73 timeout=3) 74 found=False 75 for p in packets: 76 # We can't get a reply to this 77 assert not p.getlayer(sp.ICMPv6EchoReply) 78 79 if not p.getlayer(sp.ICMPv6PacketTooBig): 80 continue 81 82 ip6 = p.getlayer(sp.IPv6) 83 icmp6 = p.getlayer(sp.ICMPv6PacketTooBig) 84 85 # Error is from the router vnet 86 assert ip6.src == "2001:db8::1" 87 assert ip6.dst == src 88 89 # And the relevant MTU is 1500 90 assert icmp6.mtu == 1500 91 92 # The icmp6 error contains our original IPv6 packet 93 err = icmp6.getlayer(sp.IPerror6) 94 assert err.src == src 95 assert err.dst == "2001:db8:1::2" 96 assert err.nh == 58 97 98 found = True 99 100 assert found 101 102 def check_icmp_echo(self, sp, payload_size, src="2001:db8::2"): 103 packet = sp.IPv6(src=src, dst="2001:db8:1::2") \ 104 / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f0') * payload_size)) 105 106 # Delay the send so the sniffer is running when we transmit. 107 s = DelayedSend(packet) 108 109 packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name, 110 timeout=3) 111 found=False 112 for p in packets: 113 if not p.getlayer(sp.ICMPv6EchoReply): 114 continue 115 116 ip6 = p.getlayer(sp.IPv6) 117 icmp6 = p.getlayer(sp.ICMPv6EchoReply) 118 119 # Error is from the router vnet 120 assert ip6.src == "2001:db8:1::2" 121 assert ip6.dst == src 122 123 found = True 124 125 assert found 126 127 @pytest.mark.require_user("root") 128 @pytest.mark.require_progs(["scapy"]) 129 def test_npt_icmp(self): 130 cl_vnet = self.vnet_map["vnet1"] 131 132 ToolsHelper.print_output("/sbin/route add -6 2001:db8:1::/64 2001:db8::1") 133 134 # For unclear reasons vnet3 doesn't respond to the first ping. 135 # Just send two for now. 136 ToolsHelper.print_output("/sbin/ping -6 -c 1 2001:db8:1::2") 137 ToolsHelper.print_output("/sbin/ping -6 -c 1 2001:db8:1::2") 138 139 # Import in the correct vnet, so at to not confuse Scapy 140 import scapy.all as sp 141 142 # A ping that easily passes without fragmentation 143 self.check_icmp_echo(sp, 128) 144 145 # Send a ping that just barely doesn't need to be fragmented 146 self.check_icmp_echo(sp, 1452) 147 148 # Send a ping that just barely needs to be fragmented 149 self.check_icmp_too_big(sp, 1453) 150 151 # A ping that arrives fragmented 152 self.check_icmp_too_big(sp, 12000, 5000) 153 154 @pytest.mark.require_user("root") 155 @pytest.mark.require_progs(["scapy"]) 156 def test_npt_route_to_icmp(self): 157 cl_vnet = self.vnet_map["vnet1"] 158 ifname = cl_vnet.iface_alias_map["if1"].name 159 ToolsHelper.print_output("/sbin/ifconfig %s inet6 alias 2001:db8::3/64" % ifname) 160 161 ToolsHelper.print_output("/sbin/route add -6 2001:db8:1::/64 2001:db8::1") 162 163 # For unclear reasons vnet3 doesn't respond to the first ping. 164 # Just send two for now. 165 ToolsHelper.print_output("/sbin/ping -6 -c 1 -S 2001:db8::3 2001:db8:1::2") 166 ToolsHelper.print_output("/sbin/ping -6 -c 1 -S 2001:db8::3 2001:db8:1::2") 167 168 # Import in the correct vnet, so at to not confuse Scapy 169 import scapy.all as sp 170 171 # A ping that easily passes without fragmentation 172 self.check_icmp_echo(sp, 128, src="2001:db8::3") 173 174 # Send a ping that just barely doesn't need to be fragmented 175 self.check_icmp_echo(sp, 1452, src="2001:db8::3") 176 177 # Send a ping that just barely needs to be fragmented 178 self.check_icmp_too_big(sp, 1453, src="2001:db8::3") 179 180 # A ping that arrives fragmented 181 self.check_icmp_too_big(sp, 12000, 5000, src="2001:db8::3") 182