xref: /freebsd/tests/sys/netpfil/pf/header.py (revision 7659d0fa2b873374623e7582e38fb2fe92056c87)
1#
2# SPDX-License-Identifier: BSD-2-Clause
3#
4# Copyright (c) 2025 Rubicon Communications, LLC (Netgate)
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions
8# are met:
9# 1. Redistributions of source code must retain the above copyright
10#    notice, this list of conditions and the following disclaimer.
11# 2. Redistributions in binary form must reproduce the above copyright
12#    notice, this list of conditions and the following disclaimer in the
13#    documentation and/or other materials provided with the distribution.
14#
15# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25# SUCH DAMAGE.
26
27import pytest
28import re
29from utils import DelayedSend
30from atf_python.sys.net.tools import ToolsHelper
31from atf_python.sys.net.vnet import VnetTestTemplate
32
33class TestHeader(VnetTestTemplate):
34    REQUIRED_MODULES = [ "pf" ]
35    TOPOLOGY = {
36        "vnet1": {"ifaces": ["if1", "if2"]},
37        "vnet2": {"ifaces": ["if1", "if2"]},
38        "if1": {"prefixes4": [("192.0.2.2/24", "192.0.2.1/24")]},
39        "if2": {"prefixes4": [("198.51.100.1/24", "198.51.100.2/24")]},
40    }
41
42    def vnet2_handler(self, vnet):
43        ToolsHelper.print_output("/sbin/sysctl net.inet.ip.forwarding=1")
44        ToolsHelper.print_output("/usr/sbin/arp -s 198.51.100.3 00:01:02:03:04:05")
45        ToolsHelper.print_output("/sbin/pfctl -e")
46        ToolsHelper.print_output("/sbin/pfctl -x loud")
47        ToolsHelper.pf_rules([
48            "pass",
49            ])
50
51    @pytest.mark.require_user("root")
52    @pytest.mark.require_progs(["scapy"])
53    def test_too_many(self):
54        "Verify that we drop packets with silly numbers of headers."
55
56        sendif = self.vnet.iface_alias_map["if1"].name
57        recvif = self.vnet.iface_alias_map["if2"].name
58        gw_mac = ToolsHelper.get_output("/sbin/ifconfig %s ether | awk '/ether/ { print $2; }'" % sendif)
59        gw_mac = re.sub("0a$", "0b", gw_mac)
60
61        ToolsHelper.print_output("/sbin/route add default 192.0.2.1")
62
63        # Import in the correct vnet, so at to not confuse Scapy
64        import scapy.all as sp
65
66        # Sanity check, ensure we get replies to normal ping
67        pkt = sp.Ether(dst=gw_mac) \
68            / sp.IP(dst="198.51.100.3") \
69            / sp.ICMP(type='echo-request')
70        s = DelayedSend(pkt, sendif)
71        reply = sp.sniff(iface=recvif, timeout=3)
72        print(reply)
73
74        found = False
75        for r in reply:
76            r.show()
77
78            icmp = r.getlayer(sp.ICMP)
79            if not icmp:
80                continue
81            assert icmp.type == 8 # 'echo-request'
82            found = True
83        assert found
84
85        # Up to 19 AH headers will pass
86        pkt = sp.Ether(dst=gw_mac) \
87            / sp.IP(dst="198.51.100.3")
88        for i in range(0, 18):
89            pkt = pkt / sp.AH(nh=51, payloadlen=1)
90        pkt = pkt / sp.AH(nh=1, payloadlen=1) / sp.ICMP(type='echo-request')
91
92        s = DelayedSend(pkt, sendif)
93        reply = sp.sniff(iface=recvif, timeout=3)
94        print(reply)
95        found = False
96        for r in reply:
97            r.show()
98
99            ah = r.getlayer(sp.AH)
100            if not ah:
101                continue
102            found = True
103        assert found
104
105        # But more will get dropped
106        pkt = sp.Ether(dst=gw_mac) \
107            / sp.IP(dst="198.51.100.3")
108        for i in range(0, 19):
109            pkt = pkt / sp.AH(nh=51, payloadlen=1)
110        pkt = pkt / sp.AH(nh=1, payloadlen=1) / sp.ICMP(type='echo-request')
111
112        s = DelayedSend(pkt, sendif)
113        reply = sp.sniff(iface=recvif, timeout=3)
114        print(reply)
115
116        found = False
117        for r in reply:
118            r.show()
119
120            ah = r.getlayer(sp.AH)
121            if not ah:
122                continue
123            found = True
124        assert not found
125
126class TestHeader6(VnetTestTemplate):
127    REQUIRED_MODULES = [ "pf" ]
128    SKIP_MODULES = [ "ipfilter" ]
129    TOPOLOGY = {
130        "vnet1": {"ifaces": ["if1", "if2"]},
131        "vnet2": {"ifaces": ["if1", "if2"]},
132        "if1": {"prefixes6": [("2001:db8::2/64", "2001:db8::1/64")]},
133        "if2": {"prefixes6": [("2001:db8:1::2/64", "2001:db8:1::1/64")]},
134    }
135
136    def vnet2_handler(self, vnet):
137        ToolsHelper.print_output("/sbin/sysctl net.inet6.ip6.forwarding=1")
138        ToolsHelper.print_output("/usr/sbin/ndp -s 2001:db8:1::3 00:01:02:03:04:05")
139        ToolsHelper.print_output("/sbin/pfctl -e")
140        ToolsHelper.print_output("/sbin/pfctl -x loud")
141        ToolsHelper.pf_rules([
142            "pass",
143            ])
144
145    @pytest.mark.require_user("root")
146    @pytest.mark.require_progs(["scapy"])
147    def test_too_many(self):
148        "Verify that we drop packets with silly numbers of headers."
149        ToolsHelper.print_output("/sbin/ifconfig")
150
151        sendif = self.vnet.iface_alias_map["if1"].name
152        recvif = self.vnet.iface_alias_map["if2"].name
153        our_mac = ToolsHelper.get_output("/sbin/ifconfig %s ether | awk '/ether/ { print $2; }'" % sendif)
154        gw_mac = re.sub("0a$", "0b", our_mac)
155
156        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
157
158        # Import in the correct vnet, so at to not confuse Scapy
159        import scapy.all as sp
160
161        # Sanity check, ensure we get replies to normal ping
162        pkt = sp.Ether(src=our_mac, dst=gw_mac) \
163            / sp.IPv6(src="2001:db8::2", dst="2001:db8:1::3") \
164            / sp.ICMPv6EchoRequest()
165        s = DelayedSend(pkt, sendif)
166        reply = sp.sniff(iface=recvif, timeout=3)
167        print(reply)
168
169        found = False
170        for r in reply:
171            r.show()
172
173            icmp = r.getlayer(sp.ICMPv6EchoRequest)
174            if not icmp:
175                continue
176            found = True
177        assert found
178
179        # Up to 19 AH headers will pass
180        pkt = sp.Ether(src=our_mac, dst=gw_mac) \
181            / sp.IPv6(src="2001:db8::2", dst="2001:db8:1::3")
182        for i in range(0, 18):
183            pkt = pkt / sp.AH(nh=51, payloadlen=1)
184        pkt = pkt / sp.AH(nh=58, payloadlen=1) / sp.ICMPv6EchoRequest()
185        s = DelayedSend(pkt, sendif)
186        reply = sp.sniff(iface=recvif, timeout=3)
187        print(reply)
188
189        found = False
190        for r in reply:
191            r.show()
192
193            ah = r.getlayer(sp.AH)
194            if not ah:
195                continue
196            found = True
197        assert found
198
199        # But more will get dropped
200        pkt = sp.Ether(src=our_mac, dst=gw_mac) \
201            / sp.IPv6(src="2001:db8::2", dst="2001:db8:1::3")
202        for i in range(0, 19):
203            pkt = pkt / sp.AH(nh=51, payloadlen=1)
204        pkt = pkt / sp.AH(nh=58, payloadlen=1) / sp.ICMPv6EchoRequest()
205        s = DelayedSend(pkt, sendif)
206        reply = sp.sniff(iface=recvif, timeout=3)
207        print(reply)
208
209        found = False
210        for r in reply:
211            r.show()
212
213            ah = r.getlayer(sp.AH)
214            if not ah:
215                continue
216            found = True
217        assert not found
218