1# 2# SPDX-License-Identifier: BSD-2-Clause 3# 4# Copyright (c) 2025 Rubicon Communications, LLC (Netgate) 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions 8# are met: 9# 1. Redistributions of source code must retain the above copyright 10# notice, this list of conditions and the following disclaimer. 11# 2. Redistributions in binary form must reproduce the above copyright 12# notice, this list of conditions and the following disclaimer in the 13# documentation and/or other materials provided with the distribution. 14# 15# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25# SUCH DAMAGE. 26 27import sys 28import pytest 29import random 30import socket 31import selectors 32from utils import DelayedSend 33from atf_python.sys.net.tools import ToolsHelper 34from atf_python.sys.net.vnet import VnetTestTemplate 35 36class TCPClient: 37 def __init__(self, src, dst, sport, dport, sp): 38 self.src = src 39 self.dst = dst 40 self.sport = sport 41 self.dport = dport 42 self.sp = sp 43 self.seq = random.randrange(1, (2**32)-1) 44 self.ack = 0 45 46 def syn(self): 47 syn = self.sp.IP(src=self.src, dst=self.dst) \ 48 / self.sp.TCP(sport=self.sport, dport=self.dport, flags="S", seq=self.seq) 49 return syn 50 51 def connect(self): 52 syn = self.syn() 53 r = self.sp.sr1(syn, timeout=5) 54 55 assert r 56 t = r.getlayer(self.sp.TCP) 57 assert t 58 assert t.sport == self.dport 59 assert t.dport == self.sport 60 assert t.flags == "SA" 61 62 self.seq += 1 63 self.ack = t.seq + 1 64 ack = self.sp.IP(src=self.src, dst=self.dst) \ 65 / self.sp.TCP(sport=self.sport, dport=self.dport, flags="A", ack=self.ack, seq=self.seq) 66 self.sp.send(ack) 67 68 def send(self, data): 69 length = len(data) 70 pkt = self.sp.IP(src=self.src, dst=self.dst) \ 71 / self.sp.TCP(sport=self.sport, dport=self.dport, ack=self.ack, seq=self.seq, flags="") \ 72 / self.sp.Raw(data) 73 self.seq += length 74 pkt.show() 75 self.sp.send(pkt) 76 77class TestTcp(VnetTestTemplate): 78 REQUIRED_MODULES = [ "pf" ] 79 TOPOLOGY = { 80 "vnet1": {"ifaces": ["if1"]}, 81 "vnet2": {"ifaces": ["if1"]}, 82 "if1": {"prefixes4": [("192.0.2.1/24", "192.0.2.2/24")]}, 83 } 84 85 def vnet2_handler(self, vnet): 86 ToolsHelper.print_output("/usr/sbin/arp -s 192.0.2.3 00:01:02:03:04:05") 87 ToolsHelper.print_output("/sbin/pfctl -e") 88 ToolsHelper.pf_rules([ 89 "pass" 90 ]) 91 ToolsHelper.print_output("/sbin/pfctl -x loud") 92 93 # Start TCP listener 94 sel = selectors.DefaultSelector() 95 t = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 96 t.bind(("0.0.0.0", 1234)) 97 t.listen(100) 98 t.setblocking(False) 99 sel.register(t, selectors.EVENT_READ, data=None) 100 101 while True: 102 events = sel.select(timeout=2) 103 for key, mask in events: 104 sock = key.fileobj 105 if key.data is None: 106 conn, addr = sock.accept() 107 print(f"Accepted connection from {addr}") 108 events = selectors.EVENT_READ | selectors.EVENT_WRITE 109 sel.register(conn, events, data="TCP") 110 else: 111 if mask & selectors.EVENT_READ: 112 recv_data = sock.recv(1024) 113 print(f"Received TCP {recv_data}") 114 ToolsHelper.print_output("/sbin/pfctl -ss -vv") 115 sock.send(recv_data) 116 117 @pytest.mark.require_user("root") 118 @pytest.mark.require_progs(["scapy"]) 119 def test_challenge_ack(self): 120 vnet = self.vnet_map["vnet1"] 121 ifname = vnet.iface_alias_map["if1"].name 122 123 # Import in the correct vnet, so at to not confuse Scapy 124 import scapy.all as sp 125 126 a = TCPClient("192.0.2.3", "192.0.2.2", 1234, 1234, sp) 127 a.connect() 128 a.send(b"foo") 129 130 b = TCPClient("192.0.2.3", "192.0.2.2", 1234, 1234, sp) 131 syn = b.syn() 132 syn.show() 133 s = DelayedSend(syn) 134 packets = sp.sniff(iface=ifname, timeout=3) 135 found = False 136 for p in packets: 137 ip = p.getlayer(sp.IP) 138 if not ip: 139 continue 140 tcp = p.getlayer(sp.TCP) 141 if not tcp: 142 continue 143 144 if ip.src != "192.0.2.2": 145 continue 146 147 p.show() 148 149 assert ip.dst == "192.0.2.3" 150 assert tcp.sport == 1234 151 assert tcp.dport == 1234 152 assert tcp.flags == "A" 153 154 # We only expect one 155 assert not found 156 found = True 157 158 assert found 159