xref: /freebsd/tests/sys/netpfil/pf/return.py (revision 16665c74788c6c75b84919d5a083369a6388f0b4)
1#
2# SPDX-License-Identifier: BSD-2-Clause
3#
4# Copyright (c) 2025 Rubicon Communications, LLC (Netgate)
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions
8# are met:
9# 1. Redistributions of source code must retain the above copyright
10#    notice, this list of conditions and the following disclaimer.
11# 2. Redistributions in binary form must reproduce the above copyright
12#    notice, this list of conditions and the following disclaimer in the
13#    documentation and/or other materials provided with the distribution.
14#
15# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25# SUCH DAMAGE.
26#
27import pytest
28import subprocess
29import re
30from atf_python.sys.net.tools import ToolsHelper
31from atf_python.sys.net.vnet import VnetTestTemplate
32
33def check(cmd):
34    ps = subprocess.Popen(cmd, shell=True)
35    ret = ps.wait()
36    if ret != 0:
37        raise Exception("Command %s returned %d" % (cmd, ret))
38
39class TestReturn(VnetTestTemplate):
40    REQUIRED_MODULES = [ "pf" ]
41    TOPOLOGY = {
42        "vnet1": {"ifaces": ["if1"]},
43        "vnet2": {"ifaces": ["if1", "if2"]},
44        "vnet3": {"ifaces": ["if2"]},
45        "if1": {"prefixes4": [("192.0.2.2/24", "192.0.2.1/24")]},
46        "if2": {"prefixes4": [("198.51.100.1/24", "198.51.100.2/24")]},
47    }
48
49    def vnet2_handler(self, vnet):
50        ifname = vnet.iface_alias_map["if1"].name
51        if2name = vnet.iface_alias_map["if2"].name
52
53        ToolsHelper.print_output("/sbin/pfctl -e")
54        ToolsHelper.pf_rules([
55            "nat on %s inet from 192.0.2.0/24 to any -> (%s)" % (ifname, ifname),
56            "block return",
57            "pass inet proto icmp icmp-type echoreq",
58            ])
59
60        ToolsHelper.print_output("/sbin/sysctl net.inet.ip.forwarding=1")
61        ToolsHelper.print_output("/sbin/pfctl -x loud")
62
63    def vnet3_handler(self, vnet):
64        ToolsHelper.print_output("/sbin/route add default 198.51.100.1")
65
66    def common_setup(self):
67        ToolsHelper.print_output("/sbin/route add default 192.0.2.1")
68
69        # Sanity check
70        check("/sbin/ping -c 1 192.0.2.1")
71        check("/sbin/ping -c 1 198.51.100.1")
72        check("/sbin/ping -c 2 198.51.100.2")
73
74    @pytest.mark.require_user("root")
75    @pytest.mark.require_progs(["scapy"])
76    def test_tcp(self):
77        self.common_setup()
78
79        # Import in the correct vnet, so at to not confuse Scapy
80        import scapy.all as sp
81
82        # Send a TCP SYN, expect a RST
83        pkt = sp.IP(src="192.0.2.2", dst="198.51.100.2") \
84            / sp.TCP(sport=4321, dport=1234, flags="S")
85        print(pkt)
86        reply = sp.sr1(pkt, timeout=3, verbose=False)
87        print(reply)
88
89        ip = reply.getlayer(sp.IP)
90        tcp = reply.getlayer(sp.TCP)
91        assert ip
92        assert ip.src == "198.51.100.2"
93        assert ip.dst == "192.0.2.2"
94        assert tcp
95        assert tcp.sport == 1234
96        assert tcp.dport == 4321
97        assert "R" in tcp.flags
98
99    @pytest.mark.require_user("root")
100    @pytest.mark.require_progs(["scapy"])
101    def test_udp(self):
102        self.common_setup()
103
104        # Import in the correct vnet, so at to not confuse Scapy
105        import scapy.all as sp
106
107        # Send a UDP packet, expect ICMP error
108        pkt = sp.IP(dst="198.51.100.2") \
109            / sp.UDP(sport=4321, dport=1234)
110        print(pkt)
111        reply = sp.sr1(pkt, timeout=3, verbose=False)
112        print(reply)
113        ip = reply.getlayer(sp.IP)
114        icmp = reply.getlayer(sp.ICMP)
115        udp = reply.getlayer(sp.UDPerror)
116
117        assert ip
118        assert ip.src == "192.0.2.1"
119        assert ip.dst == "192.0.2.2"
120        assert icmp
121        assert icmp.type == 3
122        assert icmp.code == 3
123        assert udp
124        assert udp.sport == 4321
125        assert udp.dport == 1234
126
127    @pytest.mark.require_user("root")
128    @pytest.mark.require_progs(["scapy"])
129    def test_sctp(self):
130        self.common_setup()
131
132        # Import in the correct vnet, so at to not confuse Scapy
133        import scapy.all as sp
134
135        # Send an SCTP init, expect an SCTP abort
136        pkt = sp.IP(dst="198.51.100.2") \
137            / sp.SCTP(sport=1111, dport=2222) \
138            / sp.SCTPChunkInit(init_tag=1, n_in_streams=1, n_out_streams=1, a_rwnd=1500)
139        print(pkt)
140        reply = sp.sr1(pkt, timeout=3, verbose=False)
141        print(reply)
142        ip = reply.getlayer(sp.IP)
143        sctp = reply.getlayer(sp.SCTP)
144        abort = reply.getlayer(sp.SCTPChunkAbort)
145        print(sctp)
146
147        assert ip
148        assert ip.src == "198.51.100.2"
149        assert ip.dst == "192.0.2.2"
150        assert sctp
151        assert sctp.sport == 2222
152        assert sctp.dport == 1111
153        assert(abort)
154