xref: /freebsd/tests/sys/netpfil/pf/tcp.py (revision 63cc817adde5aec0f6446e44c1eee0a7d5908dfd)
1#
2# SPDX-License-Identifier: BSD-2-Clause
3#
4# Copyright (c) 2025 Rubicon Communications, LLC (Netgate)
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions
8# are met:
9# 1. Redistributions of source code must retain the above copyright
10#    notice, this list of conditions and the following disclaimer.
11# 2. Redistributions in binary form must reproduce the above copyright
12#    notice, this list of conditions and the following disclaimer in the
13#    documentation and/or other materials provided with the distribution.
14#
15# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25# SUCH DAMAGE.
26
27import sys
28import pytest
29import random
30import socket
31import selectors
32from utils import DelayedSend
33from atf_python.sys.net.tools import ToolsHelper
34from atf_python.sys.net.vnet import VnetTestTemplate
35
36class TCPClient:
37    def __init__(self, src, dst, sport, dport, sp):
38        self.src = src
39        self.dst = dst
40        self.sport = sport
41        self.dport = dport
42        self.sp = sp
43        self.seq = random.randrange(1, (2**32)-1)
44        self.ack = 0
45
46    def syn(self):
47        syn = self.sp.IP(src=self.src, dst=self.dst) \
48            / self.sp.TCP(sport=self.sport, dport=self.dport, flags="S", seq=self.seq)
49        return syn
50
51    def connect(self):
52        syn = self.syn()
53        r = self.sp.sr1(syn, timeout=5)
54
55        assert r
56        t = r.getlayer(self.sp.TCP)
57        assert t
58        assert t.sport == self.dport
59        assert t.dport == self.sport
60        assert t.flags == "SA"
61
62        self.seq += 1
63        self.ack = t.seq + 1
64        ack = self.sp.IP(src=self.src, dst=self.dst) \
65            / self.sp.TCP(sport=self.sport, dport=self.dport, flags="A", ack=self.ack, seq=self.seq)
66        self.sp.send(ack)
67
68    def send(self, data):
69        length = len(data)
70        pkt = self.sp.IP(src=self.src, dst=self.dst) \
71            / self.sp.TCP(sport=self.sport, dport=self.dport, ack=self.ack, seq=self.seq, flags="") \
72            / self.sp.Raw(data)
73        self.seq += length
74        pkt.show()
75        self.sp.send(pkt)
76
77class TestTcp(VnetTestTemplate):
78    REQUIRED_MODULES = [ "pf" ]
79    TOPOLOGY = {
80        "vnet1": {"ifaces": ["if1"]},
81        "vnet2": {"ifaces": ["if1"]},
82        "if1": {"prefixes4": [("192.0.2.1/24", "192.0.2.2/24")]},
83    }
84
85    def vnet2_handler(self, vnet):
86        ToolsHelper.print_output("/usr/sbin/arp -s 192.0.2.3 00:01:02:03:04:05")
87        ToolsHelper.print_output("/sbin/pfctl -e")
88        ToolsHelper.pf_rules([
89            "pass"
90        ])
91        ToolsHelper.print_output("/sbin/pfctl -x loud")
92
93        # Start TCP listener
94        sel = selectors.DefaultSelector()
95        t = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
96        t.bind(("0.0.0.0", 1234))
97        t.listen(100)
98        t.setblocking(False)
99        sel.register(t, selectors.EVENT_READ, data=None)
100
101        while True:
102            events = sel.select(timeout=2)
103            for key, mask in events:
104                sock = key.fileobj
105                if key.data is None:
106                    conn, addr = sock.accept()
107                    print(f"Accepted connection from {addr}")
108                    events = selectors.EVENT_READ | selectors.EVENT_WRITE
109                    sel.register(conn, events, data="TCP")
110                else:
111                    if mask & selectors.EVENT_READ:
112                        recv_data = sock.recv(1024)
113                        print(f"Received TCP {recv_data}")
114                        ToolsHelper.print_output("/sbin/pfctl -ss -vv")
115                        sock.send(recv_data)
116
117    @pytest.mark.require_user("root")
118    @pytest.mark.require_progs(["scapy"])
119    def test_challenge_ack(self):
120        vnet = self.vnet_map["vnet1"]
121        ifname = vnet.iface_alias_map["if1"].name
122
123        # Import in the correct vnet, so at to not confuse Scapy
124        import scapy.all as sp
125
126        a = TCPClient("192.0.2.3", "192.0.2.2", 1234, 1234, sp)
127        a.connect()
128        a.send(b"foo")
129
130        b = TCPClient("192.0.2.3", "192.0.2.2", 1234, 1234, sp)
131        syn = b.syn()
132        syn.show()
133        s = DelayedSend(syn)
134        packets = sp.sniff(iface=ifname, timeout=3)
135        found = False
136        for p in packets:
137            ip = p.getlayer(sp.IP)
138            if not ip:
139                continue
140            tcp = p.getlayer(sp.TCP)
141            if not tcp:
142                continue
143
144            if ip.src != "192.0.2.2":
145                continue
146
147            p.show()
148
149            assert ip.dst == "192.0.2.3"
150            assert tcp.sport == 1234
151            assert tcp.dport == 1234
152            assert tcp.flags == "A"
153
154            # We only expect one
155            assert not found
156            found = True
157
158        assert found
159