1# 2# SPDX-License-Identifier: BSD-2-Clause 3# 4# Copyright (c) 2025 Rubicon Communications, LLC (Netgate) 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions 8# are met: 9# 1. Redistributions of source code must retain the above copyright 10# notice, this list of conditions and the following disclaimer. 11# 2. Redistributions in binary form must reproduce the above copyright 12# notice, this list of conditions and the following disclaimer in the 13# documentation and/or other materials provided with the distribution. 14# 15# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25# SUCH DAMAGE. 26 27import pytest 28import re 29from utils import DelayedSend 30from atf_python.sys.net.tools import ToolsHelper 31from atf_python.sys.net.vnet import VnetTestTemplate 32 33class TestHeader(VnetTestTemplate): 34 REQUIRED_MODULES = [ "pf" ] 35 TOPOLOGY = { 36 "vnet1": {"ifaces": ["if1", "if2"]}, 37 "vnet2": {"ifaces": ["if1", "if2"]}, 38 "if1": {"prefixes4": [("192.0.2.2/24", "192.0.2.1/24")]}, 39 "if2": {"prefixes4": [("198.51.100.1/24", "198.51.100.2/24")]}, 40 } 41 42 def vnet2_handler(self, vnet): 43 ToolsHelper.print_output("/sbin/sysctl net.inet.ip.forwarding=1") 44 ToolsHelper.print_output("/usr/sbin/arp -s 198.51.100.3 00:01:02:03:04:05") 45 ToolsHelper.print_output("/sbin/pfctl -e") 46 ToolsHelper.print_output("/sbin/pfctl -x loud") 47 ToolsHelper.pf_rules([ 48 "pass", 49 ]) 50 51 @pytest.mark.require_user("root") 52 @pytest.mark.require_progs(["scapy"]) 53 def test_too_many(self): 54 "Verify that we drop packets with silly numbers of headers." 55 56 sendif = self.vnet.iface_alias_map["if1"] 57 recvif = self.vnet.iface_alias_map["if2"].name 58 gw_mac = sendif.epairb.ether 59 60 ToolsHelper.print_output("/sbin/route add default 192.0.2.1") 61 62 # Import in the correct vnet, so at to not confuse Scapy 63 import scapy.all as sp 64 65 # Sanity check, ensure we get replies to normal ping 66 pkt = sp.Ether(dst=gw_mac) \ 67 / sp.IP(dst="198.51.100.3") \ 68 / sp.ICMP(type='echo-request') 69 s = DelayedSend(pkt, sendif.name) 70 reply = sp.sniff(iface=recvif, timeout=3) 71 print(reply) 72 73 found = False 74 for r in reply: 75 r.show() 76 77 icmp = r.getlayer(sp.ICMP) 78 if not icmp: 79 continue 80 assert icmp.type == 8 # 'echo-request' 81 found = True 82 assert found 83 84 # Up to 19 AH headers will pass 85 pkt = sp.Ether(dst=gw_mac) \ 86 / sp.IP(dst="198.51.100.3") 87 for i in range(0, 18): 88 pkt = pkt / sp.AH(nh=51, payloadlen=1) 89 pkt = pkt / sp.AH(nh=1, payloadlen=1) / sp.ICMP(type='echo-request') 90 91 s = DelayedSend(pkt, sendif.name) 92 reply = sp.sniff(iface=recvif, timeout=3) 93 print(reply) 94 found = False 95 for r in reply: 96 r.show() 97 98 ah = r.getlayer(sp.AH) 99 if not ah: 100 continue 101 found = True 102 assert found 103 104 # But more will get dropped 105 pkt = sp.Ether(dst=gw_mac) \ 106 / sp.IP(dst="198.51.100.3") 107 for i in range(0, 19): 108 pkt = pkt / sp.AH(nh=51, payloadlen=1) 109 pkt = pkt / sp.AH(nh=1, payloadlen=1) / sp.ICMP(type='echo-request') 110 111 s = DelayedSend(pkt, sendif.name) 112 reply = sp.sniff(iface=recvif, timeout=3) 113 print(reply) 114 115 found = False 116 for r in reply: 117 r.show() 118 119 ah = r.getlayer(sp.AH) 120 if not ah: 121 continue 122 found = True 123 assert not found 124 125class TestHeader6(VnetTestTemplate): 126 REQUIRED_MODULES = [ "pf" ] 127 SKIP_MODULES = [ "ipfilter" ] 128 TOPOLOGY = { 129 "vnet1": {"ifaces": ["if1", "if2"]}, 130 "vnet2": {"ifaces": ["if1", "if2"]}, 131 "if1": {"prefixes6": [("2001:db8::2/64", "2001:db8::1/64")]}, 132 "if2": {"prefixes6": [("2001:db8:1::2/64", "2001:db8:1::1/64")]}, 133 } 134 135 def vnet2_handler(self, vnet): 136 ToolsHelper.print_output("/sbin/sysctl net.inet6.ip6.forwarding=1") 137 ToolsHelper.print_output("/usr/sbin/ndp -s 2001:db8:1::3 00:01:02:03:04:05") 138 ToolsHelper.print_output("/sbin/pfctl -e") 139 ToolsHelper.print_output("/sbin/pfctl -x loud") 140 ToolsHelper.pf_rules([ 141 "pass", 142 ]) 143 144 @pytest.mark.require_user("root") 145 @pytest.mark.require_progs(["scapy"]) 146 def test_too_many(self): 147 "Verify that we drop packets with silly numbers of headers." 148 ToolsHelper.print_output("/sbin/ifconfig") 149 150 sendif = self.vnet.iface_alias_map["if1"] 151 recvif = self.vnet.iface_alias_map["if2"].name 152 our_mac = sendif.ether 153 gw_mac = sendif.epairb.ether 154 155 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 156 157 # Import in the correct vnet, so at to not confuse Scapy 158 import scapy.all as sp 159 160 # Sanity check, ensure we get replies to normal ping 161 pkt = sp.Ether(src=our_mac, dst=gw_mac) \ 162 / sp.IPv6(src="2001:db8::2", dst="2001:db8:1::3") \ 163 / sp.ICMPv6EchoRequest() 164 s = DelayedSend(pkt, sendif.name) 165 reply = sp.sniff(iface=recvif, timeout=3) 166 print(reply) 167 168 found = False 169 for r in reply: 170 r.show() 171 172 icmp = r.getlayer(sp.ICMPv6EchoRequest) 173 if not icmp: 174 continue 175 found = True 176 assert found 177 178 # Up to 19 AH headers will pass 179 pkt = sp.Ether(src=our_mac, dst=gw_mac) \ 180 / sp.IPv6(src="2001:db8::2", dst="2001:db8:1::3") 181 for i in range(0, 18): 182 pkt = pkt / sp.AH(nh=51, payloadlen=1) 183 pkt = pkt / sp.AH(nh=58, payloadlen=1) / sp.ICMPv6EchoRequest() 184 s = DelayedSend(pkt, sendif.name) 185 reply = sp.sniff(iface=recvif, timeout=3) 186 print(reply) 187 188 found = False 189 for r in reply: 190 r.show() 191 192 ah = r.getlayer(sp.AH) 193 if not ah: 194 continue 195 found = True 196 assert found 197 198 # But more will get dropped 199 pkt = sp.Ether(src=our_mac, dst=gw_mac) \ 200 / sp.IPv6(src="2001:db8::2", dst="2001:db8:1::3") 201 for i in range(0, 19): 202 pkt = pkt / sp.AH(nh=51, payloadlen=1) 203 pkt = pkt / sp.AH(nh=58, payloadlen=1) / sp.ICMPv6EchoRequest() 204 s = DelayedSend(pkt, sendif.name) 205 reply = sp.sniff(iface=recvif, timeout=3) 206 print(reply) 207 208 found = False 209 for r in reply: 210 r.show() 211 212 ah = r.getlayer(sp.AH) 213 if not ah: 214 continue 215 found = True 216 assert not found 217