1# 2# SPDX-License-Identifier: BSD-2-Clause 3# 4# Copyright (c) 2024 Rubicon Communications, LLC (Netgate) 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions 8# are met: 9# 1. Redistributions of source code must retain the above copyright 10# notice, this list of conditions and the following disclaimer. 11# 2. Redistributions in binary form must reproduce the above copyright 12# notice, this list of conditions and the following disclaimer in the 13# documentation and/or other materials provided with the distribution. 14# 15# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25# SUCH DAMAGE. 26 27import pytest 28import selectors 29import socket 30import sys 31from atf_python.sys.net.tools import ToolsHelper 32from atf_python.sys.net.vnet import VnetTestTemplate 33 34class TestNAT64(VnetTestTemplate): 35 REQUIRED_MODULES = [ "pf" ] 36 TOPOLOGY = { 37 "vnet1": {"ifaces": ["if1"]}, 38 "vnet2": {"ifaces": ["if1", "if2"]}, 39 "vnet3": {"ifaces": ["if2"]}, 40 "if1": {"prefixes6": [("2001:db8::2/64", "2001:db8::1/64")]}, 41 "if2": {"prefixes4": [("192.0.2.1/24", "192.0.2.2/24")]}, 42 } 43 44 def vnet3_handler(self, vnet): 45 ToolsHelper.print_output("/sbin/sysctl net.inet.ip.forwarding=1") 46 ToolsHelper.print_output("/sbin/sysctl net.inet.ip.ttl=62") 47 ToolsHelper.print_output("/sbin/sysctl net.inet.udp.checksum=0") 48 49 sel = selectors.DefaultSelector() 50 t = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 51 t.bind(("0.0.0.0", 1234)) 52 t.setblocking(False) 53 t.listen() 54 sel.register(t, selectors.EVENT_READ, data=None) 55 56 u = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 57 u.bind(("0.0.0.0", 4444)) 58 u.setblocking(False) 59 sel.register(u, selectors.EVENT_READ, data="UDP") 60 61 while True: 62 events = sel.select(timeout=20) 63 for key, mask in events: 64 sock = key.fileobj 65 if key.data is None: 66 conn, addr = sock.accept() 67 print(f"Accepted connection from {addr}") 68 data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"") 69 events = selectors.EVENT_READ | selectors.EVENT_WRITE 70 sel.register(conn, events, data=data) 71 elif key.data == "UDP": 72 recv_data, addr = sock.recvfrom(1024) 73 print(f"Received UDP {recv_data} from {addr}") 74 sock.sendto(b"foo", addr) 75 else: 76 if mask & selectors.EVENT_READ: 77 recv_data = sock.recv(1024) 78 print(f"Received TCP {recv_data}") 79 sock.send(b"foo") 80 else: 81 print("Unknown event?") 82 t.close() 83 u.close() 84 return 85 86 def vnet2_handler(self, vnet): 87 ifname = vnet.iface_alias_map["if1"].name 88 89 ToolsHelper.print_output("/sbin/route add default 192.0.2.2") 90 ToolsHelper.print_output("/sbin/pfctl -e") 91 ToolsHelper.pf_rules([ 92 "pass inet6 proto icmp6", 93 "pass in on %s inet6 af-to inet from 192.0.2.1" % ifname]) 94 95 @pytest.mark.require_user("root") 96 def test_tcp_rst(self): 97 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 98 99 import scapy.all as sp 100 101 # Send a SYN 102 packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ 103 / sp.TCP(dport=1222, flags="S") 104 105 # Get a reply 106 reply = sp.sr1(packet) 107 108 # We expect to get a RST here. 109 tcp = reply.getlayer(sp.TCP) 110 assert tcp 111 assert "R" in tcp.flags 112 113 # Now try to SYN to an open port 114 packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ 115 / sp.TCP(dport=1234, flags="S") 116 reply = sp.sr1(packet) 117 118 tcp = reply.getlayer(sp.TCP) 119 assert tcp 120 121 # We don't get RST 122 assert "R" not in tcp.flags 123 124 # We do get SYN|ACK 125 assert "S" in tcp.flags 126 assert "A" in tcp.flags 127 128 @pytest.mark.require_user("root") 129 def test_udp_port_closed(self): 130 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 131 132 import scapy.all as sp 133 134 packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ 135 / sp.UDP(dport=1222) / sp.Raw("bar") 136 reply = sp.sr1(packet, timeout=3) 137 print(reply.show()) 138 139 # We expect an ICMPv6 error, not a UDP reply 140 assert not reply.getlayer(sp.UDP) 141 icmp = reply.getlayer(sp.ICMPv6DestUnreach) 142 assert icmp 143 assert icmp.type == 1 144 assert icmp.code == 4 145 udp = reply.getlayer(sp.UDPerror) 146 assert udp 147 assert udp.dport == 1222 148 149 @pytest.mark.require_user("root") 150 def test_address_unreachable(self): 151 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 152 153 import scapy.all as sp 154 155 packet = sp.IPv6(dst="64:ff9b::198.51.100.3") \ 156 / sp.UDP(dport=1222) / sp.Raw("bar") 157 reply = sp.sr1(packet, timeout=3) 158 print(reply.show()) 159 160 # We expect an ICMPv6 error, not a UDP reply 161 assert not reply.getlayer(sp.UDP) 162 icmp = reply.getlayer(sp.ICMPv6DestUnreach) 163 assert icmp 164 assert icmp.type == 1 165 assert icmp.code == 0 166 udp = reply.getlayer(sp.UDPerror) 167 assert udp 168 assert udp.dport == 1222 169 170 # Check the hop limit 171 ip6 = reply.getlayer(sp.IPv6) 172 assert ip6.hlim == 62 173 174 @pytest.mark.require_user("root") 175 def test_udp_checksum(self): 176 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 177 178 import scapy.all as sp 179 180 # Send an outbound UDP packet to establish state 181 packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ 182 / sp.UDP(sport=3333, dport=4444) / sp.Raw("foo") 183 184 # Get a reply 185 # We'll send the reply without UDP checksum on the IPv4 side 186 # but that's not valid for IPv6, so expect pf to update the checksum. 187 reply = sp.sr1(packet, timeout=5) 188 189 udp = reply.getlayer(sp.UDP) 190 assert udp 191 assert udp.chksum != 0 192