1a4e04032SKristof Provost# 2a4e04032SKristof Provost# SPDX-License-Identifier: BSD-2-Clause 3a4e04032SKristof Provost# 4a4e04032SKristof Provost# Copyright (c) 2024 Rubicon Communications, LLC (Netgate) 5a4e04032SKristof Provost# 6a4e04032SKristof Provost# Redistribution and use in source and binary forms, with or without 7a4e04032SKristof Provost# modification, are permitted provided that the following conditions 8a4e04032SKristof Provost# are met: 9a4e04032SKristof Provost# 1. Redistributions of source code must retain the above copyright 10a4e04032SKristof Provost# notice, this list of conditions and the following disclaimer. 11a4e04032SKristof Provost# 2. Redistributions in binary form must reproduce the above copyright 12a4e04032SKristof Provost# notice, this list of conditions and the following disclaimer in the 13a4e04032SKristof Provost# documentation and/or other materials provided with the distribution. 14a4e04032SKristof Provost# 15a4e04032SKristof Provost# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16a4e04032SKristof Provost# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17a4e04032SKristof Provost# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18a4e04032SKristof Provost# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19a4e04032SKristof Provost# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20a4e04032SKristof Provost# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21a4e04032SKristof Provost# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22a4e04032SKristof Provost# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23a4e04032SKristof Provost# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24a4e04032SKristof Provost# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25a4e04032SKristof Provost# SUCH DAMAGE. 26a4e04032SKristof Provost 27a4e04032SKristof Provostimport pytest 285d121937SKristof Provostimport selectors 295d121937SKristof Provostimport socket 305d121937SKristof Provostimport sys 31a4e04032SKristof Provostfrom atf_python.sys.net.tools import ToolsHelper 32a4e04032SKristof Provostfrom atf_python.sys.net.vnet import VnetTestTemplate 33a4e04032SKristof Provost 34a4e04032SKristof Provostclass TestNAT64(VnetTestTemplate): 35a4e04032SKristof Provost REQUIRED_MODULES = [ "pf" ] 36a4e04032SKristof Provost TOPOLOGY = { 37a4e04032SKristof Provost "vnet1": {"ifaces": ["if1"]}, 38a4e04032SKristof Provost "vnet2": {"ifaces": ["if1", "if2"]}, 39a4e04032SKristof Provost "vnet3": {"ifaces": ["if2"]}, 40a4e04032SKristof Provost "if1": {"prefixes6": [("2001:db8::2/64", "2001:db8::1/64")]}, 41a4e04032SKristof Provost "if2": {"prefixes4": [("192.0.2.1/24", "192.0.2.2/24")]}, 42a4e04032SKristof Provost } 43a4e04032SKristof Provost 44a4e04032SKristof Provost def vnet3_handler(self, vnet): 45373d6dbfSKristof Provost ToolsHelper.print_output("/sbin/sysctl net.inet.ip.forwarding=1") 46b717c676SKristof Provost ToolsHelper.print_output("/sbin/sysctl net.inet.ip.ttl=62") 475d121937SKristof Provost ToolsHelper.print_output("/sbin/sysctl net.inet.udp.checksum=0") 485d121937SKristof Provost 495d121937SKristof Provost sel = selectors.DefaultSelector() 505d121937SKristof Provost t = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 515d121937SKristof Provost t.bind(("0.0.0.0", 1234)) 525d121937SKristof Provost t.setblocking(False) 535d121937SKristof Provost t.listen() 545d121937SKristof Provost sel.register(t, selectors.EVENT_READ, data=None) 555d121937SKristof Provost 565d121937SKristof Provost u = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 575d121937SKristof Provost u.bind(("0.0.0.0", 4444)) 585d121937SKristof Provost u.setblocking(False) 595d121937SKristof Provost sel.register(u, selectors.EVENT_READ, data="UDP") 605d121937SKristof Provost 615d121937SKristof Provost while True: 625d121937SKristof Provost events = sel.select(timeout=20) 635d121937SKristof Provost for key, mask in events: 645d121937SKristof Provost sock = key.fileobj 655d121937SKristof Provost if key.data is None: 665d121937SKristof Provost conn, addr = sock.accept() 675d121937SKristof Provost print(f"Accepted connection from {addr}") 685d121937SKristof Provost data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"") 695d121937SKristof Provost events = selectors.EVENT_READ | selectors.EVENT_WRITE 705d121937SKristof Provost sel.register(conn, events, data=data) 715d121937SKristof Provost elif key.data == "UDP": 725d121937SKristof Provost recv_data, addr = sock.recvfrom(1024) 735d121937SKristof Provost print(f"Received UDP {recv_data} from {addr}") 745d121937SKristof Provost sock.sendto(b"foo", addr) 755d121937SKristof Provost else: 765d121937SKristof Provost if mask & selectors.EVENT_READ: 775d121937SKristof Provost recv_data = sock.recv(1024) 785d121937SKristof Provost print(f"Received TCP {recv_data}") 795d121937SKristof Provost sock.send(b"foo") 805d121937SKristof Provost else: 815d121937SKristof Provost print("Unknown event?") 825d121937SKristof Provost t.close() 835d121937SKristof Provost u.close() 845d121937SKristof Provost return 85a4e04032SKristof Provost 86a4e04032SKristof Provost def vnet2_handler(self, vnet): 87a4e04032SKristof Provost ifname = vnet.iface_alias_map["if1"].name 88a4e04032SKristof Provost 89373d6dbfSKristof Provost ToolsHelper.print_output("/sbin/route add default 192.0.2.2") 90a4e04032SKristof Provost ToolsHelper.print_output("/sbin/pfctl -e") 91a4e04032SKristof Provost ToolsHelper.pf_rules([ 92a4e04032SKristof Provost "pass inet6 proto icmp6", 93a4e04032SKristof Provost "pass in on %s inet6 af-to inet from 192.0.2.1" % ifname]) 94a4e04032SKristof Provost 95a4e04032SKristof Provost @pytest.mark.require_user("root") 96*65cc5af1SJose Luis Duran @pytest.mark.require_progs(["scapy"]) 97a4e04032SKristof Provost def test_tcp_rst(self): 98a4e04032SKristof Provost ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 99a4e04032SKristof Provost 100a4e04032SKristof Provost import scapy.all as sp 101a4e04032SKristof Provost 102a4e04032SKristof Provost # Send a SYN 103a4e04032SKristof Provost packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ 104a4e04032SKristof Provost / sp.TCP(dport=1222, flags="S") 105a4e04032SKristof Provost 106a4e04032SKristof Provost # Get a reply 107a4e04032SKristof Provost reply = sp.sr1(packet) 108a4e04032SKristof Provost 109a4e04032SKristof Provost # We expect to get a RST here. 110a4e04032SKristof Provost tcp = reply.getlayer(sp.TCP) 111a4e04032SKristof Provost assert tcp 112a4e04032SKristof Provost assert "R" in tcp.flags 113a4e04032SKristof Provost 114a4e04032SKristof Provost # Now try to SYN to an open port 115a4e04032SKristof Provost packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ 116a4e04032SKristof Provost / sp.TCP(dport=1234, flags="S") 117a4e04032SKristof Provost reply = sp.sr1(packet) 118a4e04032SKristof Provost 119a4e04032SKristof Provost tcp = reply.getlayer(sp.TCP) 120a4e04032SKristof Provost assert tcp 121a4e04032SKristof Provost 122a4e04032SKristof Provost # We don't get RST 123a4e04032SKristof Provost assert "R" not in tcp.flags 124a4e04032SKristof Provost 125a4e04032SKristof Provost # We do get SYN|ACK 126a4e04032SKristof Provost assert "S" in tcp.flags 127a4e04032SKristof Provost assert "A" in tcp.flags 128a4e04032SKristof Provost 129bc66cb3bSKristof Provost @pytest.mark.require_user("root") 130*65cc5af1SJose Luis Duran @pytest.mark.require_progs(["scapy"]) 131bc66cb3bSKristof Provost def test_udp_port_closed(self): 132bc66cb3bSKristof Provost ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 133bc66cb3bSKristof Provost 134bc66cb3bSKristof Provost import scapy.all as sp 135bc66cb3bSKristof Provost 136bc66cb3bSKristof Provost packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ 137bc66cb3bSKristof Provost / sp.UDP(dport=1222) / sp.Raw("bar") 138bc66cb3bSKristof Provost reply = sp.sr1(packet, timeout=3) 139bc66cb3bSKristof Provost print(reply.show()) 140bc66cb3bSKristof Provost 141bc66cb3bSKristof Provost # We expect an ICMPv6 error, not a UDP reply 142bc66cb3bSKristof Provost assert not reply.getlayer(sp.UDP) 143bc66cb3bSKristof Provost icmp = reply.getlayer(sp.ICMPv6DestUnreach) 144bc66cb3bSKristof Provost assert icmp 145bc66cb3bSKristof Provost assert icmp.type == 1 146bc66cb3bSKristof Provost assert icmp.code == 4 147bc66cb3bSKristof Provost udp = reply.getlayer(sp.UDPerror) 148bc66cb3bSKristof Provost assert udp 149bc66cb3bSKristof Provost assert udp.dport == 1222 150373d6dbfSKristof Provost 151373d6dbfSKristof Provost @pytest.mark.require_user("root") 152*65cc5af1SJose Luis Duran @pytest.mark.require_progs(["scapy"]) 153373d6dbfSKristof Provost def test_address_unreachable(self): 154373d6dbfSKristof Provost ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 155373d6dbfSKristof Provost 156373d6dbfSKristof Provost import scapy.all as sp 157373d6dbfSKristof Provost 158373d6dbfSKristof Provost packet = sp.IPv6(dst="64:ff9b::198.51.100.3") \ 159373d6dbfSKristof Provost / sp.UDP(dport=1222) / sp.Raw("bar") 160373d6dbfSKristof Provost reply = sp.sr1(packet, timeout=3) 161373d6dbfSKristof Provost print(reply.show()) 162373d6dbfSKristof Provost 163373d6dbfSKristof Provost # We expect an ICMPv6 error, not a UDP reply 164373d6dbfSKristof Provost assert not reply.getlayer(sp.UDP) 165373d6dbfSKristof Provost icmp = reply.getlayer(sp.ICMPv6DestUnreach) 166373d6dbfSKristof Provost assert icmp 167373d6dbfSKristof Provost assert icmp.type == 1 168373d6dbfSKristof Provost assert icmp.code == 0 169373d6dbfSKristof Provost udp = reply.getlayer(sp.UDPerror) 170373d6dbfSKristof Provost assert udp 171373d6dbfSKristof Provost assert udp.dport == 1222 172b717c676SKristof Provost 173b717c676SKristof Provost # Check the hop limit 174b717c676SKristof Provost ip6 = reply.getlayer(sp.IPv6) 175b717c676SKristof Provost assert ip6.hlim == 62 1765d121937SKristof Provost 1775d121937SKristof Provost @pytest.mark.require_user("root") 178*65cc5af1SJose Luis Duran @pytest.mark.require_progs(["scapy"]) 1795d121937SKristof Provost def test_udp_checksum(self): 1805d121937SKristof Provost ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 1815d121937SKristof Provost 1825d121937SKristof Provost import scapy.all as sp 1835d121937SKristof Provost 1845d121937SKristof Provost # Send an outbound UDP packet to establish state 1855d121937SKristof Provost packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ 1865d121937SKristof Provost / sp.UDP(sport=3333, dport=4444) / sp.Raw("foo") 1875d121937SKristof Provost 1885d121937SKristof Provost # Get a reply 1895d121937SKristof Provost # We'll send the reply without UDP checksum on the IPv4 side 1905d121937SKristof Provost # but that's not valid for IPv6, so expect pf to update the checksum. 1915d121937SKristof Provost reply = sp.sr1(packet, timeout=5) 1925d121937SKristof Provost 1935d121937SKristof Provost udp = reply.getlayer(sp.UDP) 1945d121937SKristof Provost assert udp 1955d121937SKristof Provost assert udp.chksum != 0 196