1# 2# SPDX-License-Identifier: BSD-2-Clause 3# 4# Copyright (c) 2024 Rubicon Communications, LLC (Netgate) 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions 8# are met: 9# 1. Redistributions of source code must retain the above copyright 10# notice, this list of conditions and the following disclaimer. 11# 2. Redistributions in binary form must reproduce the above copyright 12# notice, this list of conditions and the following disclaimer in the 13# documentation and/or other materials provided with the distribution. 14# 15# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25# SUCH DAMAGE. 26 27import pytest 28import selectors 29import socket 30import sys 31from atf_python.sys.net.tools import ToolsHelper 32from atf_python.sys.net.vnet import VnetTestTemplate 33 34class TestNAT64(VnetTestTemplate): 35 REQUIRED_MODULES = [ "pf" ] 36 TOPOLOGY = { 37 "vnet1": {"ifaces": ["if1"]}, 38 "vnet2": {"ifaces": ["if1", "if2"]}, 39 "vnet3": {"ifaces": ["if2"]}, 40 "if1": {"prefixes6": [("2001:db8::2/64", "2001:db8::1/64")]}, 41 "if2": {"prefixes4": [("192.0.2.1/24", "192.0.2.2/24")]}, 42 } 43 44 def vnet3_handler(self, vnet): 45 ToolsHelper.print_output("/sbin/sysctl net.inet.ip.forwarding=1") 46 ToolsHelper.print_output("/sbin/sysctl net.inet.ip.ttl=62") 47 ToolsHelper.print_output("/sbin/sysctl net.inet.udp.checksum=0") 48 49 sel = selectors.DefaultSelector() 50 t = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 51 t.bind(("0.0.0.0", 1234)) 52 t.setblocking(False) 53 t.listen() 54 sel.register(t, selectors.EVENT_READ, data=None) 55 56 u = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 57 u.bind(("0.0.0.0", 4444)) 58 u.setblocking(False) 59 sel.register(u, selectors.EVENT_READ, data="UDP") 60 61 while True: 62 events = sel.select(timeout=20) 63 for key, mask in events: 64 sock = key.fileobj 65 if key.data is None: 66 conn, addr = sock.accept() 67 print(f"Accepted connection from {addr}") 68 data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"") 69 events = selectors.EVENT_READ | selectors.EVENT_WRITE 70 sel.register(conn, events, data=data) 71 elif key.data == "UDP": 72 recv_data, addr = sock.recvfrom(1024) 73 print(f"Received UDP {recv_data} from {addr}") 74 sock.sendto(b"foo", addr) 75 else: 76 if mask & selectors.EVENT_READ: 77 recv_data = sock.recv(1024) 78 print(f"Received TCP {recv_data}") 79 sock.send(b"foo") 80 else: 81 print("Unknown event?") 82 t.close() 83 u.close() 84 return 85 86 def vnet2_handler(self, vnet): 87 ifname = vnet.iface_alias_map["if1"].name 88 89 ToolsHelper.print_output("/sbin/route add default 192.0.2.2") 90 ToolsHelper.print_output("/sbin/pfctl -e") 91 ToolsHelper.pf_rules([ 92 "pass inet6 proto icmp6", 93 "pass in on %s inet6 af-to inet from 192.0.2.1" % ifname]) 94 95 @pytest.mark.require_user("root") 96 @pytest.mark.require_progs(["scapy"]) 97 def test_tcp_rst(self): 98 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 99 100 import scapy.all as sp 101 102 # Send a SYN 103 packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ 104 / sp.TCP(dport=1222, flags="S") 105 106 # Get a reply 107 reply = sp.sr1(packet) 108 109 # We expect to get a RST here. 110 tcp = reply.getlayer(sp.TCP) 111 assert tcp 112 assert "R" in tcp.flags 113 114 # Now try to SYN to an open port 115 packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ 116 / sp.TCP(dport=1234, flags="S") 117 reply = sp.sr1(packet) 118 119 tcp = reply.getlayer(sp.TCP) 120 assert tcp 121 122 # We don't get RST 123 assert "R" not in tcp.flags 124 125 # We do get SYN|ACK 126 assert "S" in tcp.flags 127 assert "A" in tcp.flags 128 129 @pytest.mark.require_user("root") 130 @pytest.mark.require_progs(["scapy"]) 131 def test_udp_port_closed(self): 132 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 133 134 import scapy.all as sp 135 136 packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ 137 / sp.UDP(dport=1222) / sp.Raw("bar") 138 reply = sp.sr1(packet, timeout=3) 139 print(reply.show()) 140 141 # We expect an ICMPv6 error, not a UDP reply 142 assert not reply.getlayer(sp.UDP) 143 icmp = reply.getlayer(sp.ICMPv6DestUnreach) 144 assert icmp 145 assert icmp.type == 1 146 assert icmp.code == 4 147 udp = reply.getlayer(sp.UDPerror) 148 assert udp 149 assert udp.dport == 1222 150 151 @pytest.mark.require_user("root") 152 @pytest.mark.require_progs(["scapy"]) 153 def test_address_unreachable(self): 154 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 155 156 import scapy.all as sp 157 158 packet = sp.IPv6(dst="64:ff9b::198.51.100.3") \ 159 / sp.UDP(dport=1222) / sp.Raw("bar") 160 reply = sp.sr1(packet, timeout=3) 161 print(reply.show()) 162 163 # We expect an ICMPv6 error, not a UDP reply 164 assert not reply.getlayer(sp.UDP) 165 icmp = reply.getlayer(sp.ICMPv6DestUnreach) 166 assert icmp 167 assert icmp.type == 1 168 assert icmp.code == 0 169 udp = reply.getlayer(sp.UDPerror) 170 assert udp 171 assert udp.dport == 1222 172 173 # Check the hop limit 174 ip6 = reply.getlayer(sp.IPv6) 175 assert ip6.hlim == 62 176 177 @pytest.mark.require_user("root") 178 @pytest.mark.require_progs(["scapy"]) 179 def test_udp_checksum(self): 180 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 181 182 import scapy.all as sp 183 184 # Send an outbound UDP packet to establish state 185 packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ 186 / sp.UDP(sport=3333, dport=4444) / sp.Raw("foo") 187 188 # Get a reply 189 # We'll send the reply without UDP checksum on the IPv4 side 190 # but that's not valid for IPv6, so expect pf to update the checksum. 191 reply = sp.sr1(packet, timeout=5) 192 193 udp = reply.getlayer(sp.UDP) 194 assert udp 195 assert udp.chksum != 0 196