xref: /freebsd/tests/sys/netpfil/pf/header.py (revision ae07a5805b1906f29e786f415d67bef334557bd3)
1#
2# SPDX-License-Identifier: BSD-2-Clause
3#
4# Copyright (c) 2025 Rubicon Communications, LLC (Netgate)
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions
8# are met:
9# 1. Redistributions of source code must retain the above copyright
10#    notice, this list of conditions and the following disclaimer.
11# 2. Redistributions in binary form must reproduce the above copyright
12#    notice, this list of conditions and the following disclaimer in the
13#    documentation and/or other materials provided with the distribution.
14#
15# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25# SUCH DAMAGE.
26
27import pytest
28import re
29from utils import DelayedSend
30from atf_python.sys.net.tools import ToolsHelper
31from atf_python.sys.net.vnet import VnetTestTemplate
32
33class TestHeader(VnetTestTemplate):
34    REQUIRED_MODULES = [ "pf" ]
35    TOPOLOGY = {
36        "vnet1": {"ifaces": ["if1", "if2"]},
37        "vnet2": {"ifaces": ["if1", "if2"]},
38        "if1": {"prefixes4": [("192.0.2.2/24", "192.0.2.1/24")]},
39        "if2": {"prefixes4": [("198.51.100.1/24", "198.51.100.2/24")]},
40    }
41
42    def vnet2_handler(self, vnet):
43        ToolsHelper.print_output("/sbin/sysctl net.inet.ip.forwarding=1")
44        ToolsHelper.print_output("/usr/sbin/arp -s 198.51.100.3 00:01:02:03:04:05")
45        ToolsHelper.print_output("/sbin/pfctl -e")
46        ToolsHelper.print_output("/sbin/pfctl -x loud")
47        ToolsHelper.pf_rules([
48            "pass",
49            ])
50
51    @pytest.mark.require_user("root")
52    @pytest.mark.require_progs(["scapy"])
53    def test_too_many(self):
54        "Verify that we drop packets with silly numbers of headers."
55
56        sendif = self.vnet.iface_alias_map["if1"]
57        recvif = self.vnet.iface_alias_map["if2"].name
58        gw_mac = sendif.epairb.ether
59
60        ToolsHelper.print_output("/sbin/route add default 192.0.2.1")
61
62        # Import in the correct vnet, so at to not confuse Scapy
63        import scapy.all as sp
64
65        # Sanity check, ensure we get replies to normal ping
66        pkt = sp.Ether(dst=gw_mac) \
67            / sp.IP(dst="198.51.100.3") \
68            / sp.ICMP(type='echo-request')
69        s = DelayedSend(pkt, sendif.name)
70        reply = sp.sniff(iface=recvif, timeout=3)
71        print(reply)
72
73        found = False
74        for r in reply:
75            r.show()
76
77            icmp = r.getlayer(sp.ICMP)
78            if not icmp:
79                continue
80            assert icmp.type == 8 # 'echo-request'
81            found = True
82        assert found
83
84        # Up to 19 AH headers will pass
85        pkt = sp.Ether(dst=gw_mac) \
86            / sp.IP(dst="198.51.100.3")
87        for i in range(0, 18):
88            pkt = pkt / sp.AH(nh=51, payloadlen=1)
89        pkt = pkt / sp.AH(nh=1, payloadlen=1) / sp.ICMP(type='echo-request')
90
91        s = DelayedSend(pkt, sendif.name)
92        reply = sp.sniff(iface=recvif, timeout=3)
93        print(reply)
94        found = False
95        for r in reply:
96            r.show()
97
98            ah = r.getlayer(sp.AH)
99            if not ah:
100                continue
101            found = True
102        assert found
103
104        # But more will get dropped
105        pkt = sp.Ether(dst=gw_mac) \
106            / sp.IP(dst="198.51.100.3")
107        for i in range(0, 19):
108            pkt = pkt / sp.AH(nh=51, payloadlen=1)
109        pkt = pkt / sp.AH(nh=1, payloadlen=1) / sp.ICMP(type='echo-request')
110
111        s = DelayedSend(pkt, sendif.name)
112        reply = sp.sniff(iface=recvif, timeout=3)
113        print(reply)
114
115        found = False
116        for r in reply:
117            r.show()
118
119            ah = r.getlayer(sp.AH)
120            if not ah:
121                continue
122            found = True
123        assert not found
124
125class TestHeader6(VnetTestTemplate):
126    REQUIRED_MODULES = [ "pf" ]
127    SKIP_MODULES = [ "ipfilter" ]
128    TOPOLOGY = {
129        "vnet1": {"ifaces": ["if1", "if2"]},
130        "vnet2": {"ifaces": ["if1", "if2"]},
131        "if1": {"prefixes6": [("2001:db8::2/64", "2001:db8::1/64")]},
132        "if2": {"prefixes6": [("2001:db8:1::2/64", "2001:db8:1::1/64")]},
133    }
134
135    def vnet2_handler(self, vnet):
136        ToolsHelper.print_output("/sbin/sysctl net.inet6.ip6.forwarding=1")
137        ToolsHelper.print_output("/usr/sbin/ndp -s 2001:db8:1::3 00:01:02:03:04:05")
138        ToolsHelper.print_output("/sbin/pfctl -e")
139        ToolsHelper.print_output("/sbin/pfctl -x loud")
140        ToolsHelper.pf_rules([
141            "pass",
142            ])
143
144    @pytest.mark.require_user("root")
145    @pytest.mark.require_progs(["scapy"])
146    def test_too_many(self):
147        "Verify that we drop packets with silly numbers of headers."
148        ToolsHelper.print_output("/sbin/ifconfig")
149
150        sendif = self.vnet.iface_alias_map["if1"]
151        recvif = self.vnet.iface_alias_map["if2"].name
152        our_mac = sendif.ether
153        gw_mac = sendif.epairb.ether
154
155        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
156
157        # Import in the correct vnet, so at to not confuse Scapy
158        import scapy.all as sp
159
160        # Sanity check, ensure we get replies to normal ping
161        pkt = sp.Ether(src=our_mac, dst=gw_mac) \
162            / sp.IPv6(src="2001:db8::2", dst="2001:db8:1::3") \
163            / sp.ICMPv6EchoRequest()
164        s = DelayedSend(pkt, sendif.name)
165        reply = sp.sniff(iface=recvif, timeout=3)
166        print(reply)
167
168        found = False
169        for r in reply:
170            r.show()
171
172            icmp = r.getlayer(sp.ICMPv6EchoRequest)
173            if not icmp:
174                continue
175            found = True
176        assert found
177
178        # Up to 19 AH headers will pass
179        pkt = sp.Ether(src=our_mac, dst=gw_mac) \
180            / sp.IPv6(src="2001:db8::2", dst="2001:db8:1::3")
181        for i in range(0, 18):
182            pkt = pkt / sp.AH(nh=51, payloadlen=1)
183        pkt = pkt / sp.AH(nh=58, payloadlen=1) / sp.ICMPv6EchoRequest()
184        s = DelayedSend(pkt, sendif.name)
185        reply = sp.sniff(iface=recvif, timeout=3)
186        print(reply)
187
188        found = False
189        for r in reply:
190            r.show()
191
192            ah = r.getlayer(sp.AH)
193            if not ah:
194                continue
195            found = True
196        assert found
197
198        # But more will get dropped
199        pkt = sp.Ether(src=our_mac, dst=gw_mac) \
200            / sp.IPv6(src="2001:db8::2", dst="2001:db8:1::3")
201        for i in range(0, 19):
202            pkt = pkt / sp.AH(nh=51, payloadlen=1)
203        pkt = pkt / sp.AH(nh=58, payloadlen=1) / sp.ICMPv6EchoRequest()
204        s = DelayedSend(pkt, sendif.name)
205        reply = sp.sniff(iface=recvif, timeout=3)
206        print(reply)
207
208        found = False
209        for r in reply:
210            r.show()
211
212            ah = r.getlayer(sp.AH)
213            if not ah:
214                continue
215            found = True
216        assert not found
217