1# 2# SPDX-License-Identifier: BSD-2-Clause 3# 4# Copyright (c) 2025 Rubicon Communications, LLC (Netgate) 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions 8# are met: 9# 1. Redistributions of source code must retain the above copyright 10# notice, this list of conditions and the following disclaimer. 11# 2. Redistributions in binary form must reproduce the above copyright 12# notice, this list of conditions and the following disclaimer in the 13# documentation and/or other materials provided with the distribution. 14# 15# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25# SUCH DAMAGE. 26 27import pytest 28import re 29from utils import DelayedSend 30from atf_python.sys.net.tools import ToolsHelper 31from atf_python.sys.net.vnet import VnetTestTemplate 32 33class TestHeader(VnetTestTemplate): 34 REQUIRED_MODULES = [ "pf" ] 35 TOPOLOGY = { 36 "vnet1": {"ifaces": ["if1", "if2"]}, 37 "vnet2": {"ifaces": ["if1", "if2"]}, 38 "if1": {"prefixes4": [("192.0.2.2/24", "192.0.2.1/24")]}, 39 "if2": {"prefixes4": [("198.51.100.1/24", "198.51.100.2/24")]}, 40 } 41 42 def vnet2_handler(self, vnet): 43 ToolsHelper.print_output("/sbin/sysctl net.inet.ip.forwarding=1") 44 ToolsHelper.print_output("/usr/sbin/arp -s 198.51.100.3 00:01:02:03:04:05") 45 ToolsHelper.print_output("/sbin/pfctl -e") 46 ToolsHelper.print_output("/sbin/pfctl -x loud") 47 ToolsHelper.pf_rules([ 48 "pass", 49 ]) 50 51 @pytest.mark.require_user("root") 52 @pytest.mark.require_progs(["scapy"]) 53 def test_too_many(self): 54 "Verify that we drop packets with silly numbers of headers." 55 56 sendif = self.vnet.iface_alias_map["if1"].name 57 recvif = self.vnet.iface_alias_map["if2"].name 58 gw_mac = ToolsHelper.get_output("/sbin/ifconfig %s ether | awk '/ether/ { print $2; }'" % sendif) 59 gw_mac = re.sub("0a$", "0b", gw_mac) 60 61 ToolsHelper.print_output("/sbin/route add default 192.0.2.1") 62 63 # Import in the correct vnet, so at to not confuse Scapy 64 import scapy.all as sp 65 66 # Sanity check, ensure we get replies to normal ping 67 pkt = sp.Ether(dst=gw_mac) \ 68 / sp.IP(dst="198.51.100.3") \ 69 / sp.ICMP(type='echo-request') 70 s = DelayedSend(pkt, sendif) 71 reply = sp.sniff(iface=recvif, timeout=3) 72 print(reply) 73 74 found = False 75 for r in reply: 76 r.show() 77 78 icmp = r.getlayer(sp.ICMP) 79 if not icmp: 80 continue 81 assert icmp.type == 8 # 'echo-request' 82 found = True 83 assert found 84 85 # Up to 19 AH headers will pass 86 pkt = sp.Ether(dst=gw_mac) \ 87 / sp.IP(dst="198.51.100.3") 88 for i in range(0, 18): 89 pkt = pkt / sp.AH(nh=51, payloadlen=1) 90 pkt = pkt / sp.AH(nh=1, payloadlen=1) / sp.ICMP(type='echo-request') 91 92 s = DelayedSend(pkt, sendif) 93 reply = sp.sniff(iface=recvif, timeout=3) 94 print(reply) 95 found = False 96 for r in reply: 97 r.show() 98 99 ah = r.getlayer(sp.AH) 100 if not ah: 101 continue 102 found = True 103 assert found 104 105 # But more will get dropped 106 pkt = sp.Ether(dst=gw_mac) \ 107 / sp.IP(dst="198.51.100.3") 108 for i in range(0, 19): 109 pkt = pkt / sp.AH(nh=51, payloadlen=1) 110 pkt = pkt / sp.AH(nh=1, payloadlen=1) / sp.ICMP(type='echo-request') 111 112 s = DelayedSend(pkt, sendif) 113 reply = sp.sniff(iface=recvif, timeout=3) 114 print(reply) 115 116 found = False 117 for r in reply: 118 r.show() 119 120 ah = r.getlayer(sp.AH) 121 if not ah: 122 continue 123 found = True 124 assert not found 125 126class TestHeader6(VnetTestTemplate): 127 REQUIRED_MODULES = [ "pf" ] 128 SKIP_MODULES = [ "ipfilter" ] 129 TOPOLOGY = { 130 "vnet1": {"ifaces": ["if1", "if2"]}, 131 "vnet2": {"ifaces": ["if1", "if2"]}, 132 "if1": {"prefixes6": [("2001:db8::2/64", "2001:db8::1/64")]}, 133 "if2": {"prefixes6": [("2001:db8:1::2/64", "2001:db8:1::1/64")]}, 134 } 135 136 def vnet2_handler(self, vnet): 137 ToolsHelper.print_output("/sbin/sysctl net.inet6.ip6.forwarding=1") 138 ToolsHelper.print_output("/usr/sbin/ndp -s 2001:db8:1::3 00:01:02:03:04:05") 139 ToolsHelper.print_output("/sbin/pfctl -e") 140 ToolsHelper.print_output("/sbin/pfctl -x loud") 141 ToolsHelper.pf_rules([ 142 "pass", 143 ]) 144 145 @pytest.mark.require_user("root") 146 @pytest.mark.require_progs(["scapy"]) 147 def test_too_many(self): 148 "Verify that we drop packets with silly numbers of headers." 149 ToolsHelper.print_output("/sbin/ifconfig") 150 151 sendif = self.vnet.iface_alias_map["if1"].name 152 recvif = self.vnet.iface_alias_map["if2"].name 153 our_mac = ToolsHelper.get_output("/sbin/ifconfig %s ether | awk '/ether/ { print $2; }'" % sendif) 154 gw_mac = re.sub("0a$", "0b", our_mac) 155 156 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 157 158 # Import in the correct vnet, so at to not confuse Scapy 159 import scapy.all as sp 160 161 # Sanity check, ensure we get replies to normal ping 162 pkt = sp.Ether(src=our_mac, dst=gw_mac) \ 163 / sp.IPv6(src="2001:db8::2", dst="2001:db8:1::3") \ 164 / sp.ICMPv6EchoRequest() 165 s = DelayedSend(pkt, sendif) 166 reply = sp.sniff(iface=recvif, timeout=3) 167 print(reply) 168 169 found = False 170 for r in reply: 171 r.show() 172 173 icmp = r.getlayer(sp.ICMPv6EchoRequest) 174 if not icmp: 175 continue 176 found = True 177 assert found 178 179 # Up to 19 AH headers will pass 180 pkt = sp.Ether(src=our_mac, dst=gw_mac) \ 181 / sp.IPv6(src="2001:db8::2", dst="2001:db8:1::3") 182 for i in range(0, 18): 183 pkt = pkt / sp.AH(nh=51, payloadlen=1) 184 pkt = pkt / sp.AH(nh=58, payloadlen=1) / sp.ICMPv6EchoRequest() 185 s = DelayedSend(pkt, sendif) 186 reply = sp.sniff(iface=recvif, timeout=3) 187 print(reply) 188 189 found = False 190 for r in reply: 191 r.show() 192 193 ah = r.getlayer(sp.AH) 194 if not ah: 195 continue 196 found = True 197 assert found 198 199 # But more will get dropped 200 pkt = sp.Ether(src=our_mac, dst=gw_mac) \ 201 / sp.IPv6(src="2001:db8::2", dst="2001:db8:1::3") 202 for i in range(0, 19): 203 pkt = pkt / sp.AH(nh=51, payloadlen=1) 204 pkt = pkt / sp.AH(nh=58, payloadlen=1) / sp.ICMPv6EchoRequest() 205 s = DelayedSend(pkt, sendif) 206 reply = sp.sniff(iface=recvif, timeout=3) 207 print(reply) 208 209 found = False 210 for r in reply: 211 r.show() 212 213 ah = r.getlayer(sp.AH) 214 if not ah: 215 continue 216 found = True 217 assert not found 218