1# 2# SPDX-License-Identifier: BSD-2-Clause 3# 4# Copyright (c) 2024 Rubicon Communications, LLC (Netgate) 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions 8# are met: 9# 1. Redistributions of source code must retain the above copyright 10# notice, this list of conditions and the following disclaimer. 11# 2. Redistributions in binary form must reproduce the above copyright 12# notice, this list of conditions and the following disclaimer in the 13# documentation and/or other materials provided with the distribution. 14# 15# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25# SUCH DAMAGE. 26 27import pytest 28import selectors 29import socket 30import sys 31from utils import DelayedSend 32from atf_python.sys.net.tools import ToolsHelper 33from atf_python.sys.net.vnet import VnetTestTemplate 34 35class TestNAT64(VnetTestTemplate): 36 REQUIRED_MODULES = [ "pf" ] 37 TOPOLOGY = { 38 "vnet1": {"ifaces": ["if1"]}, 39 "vnet2": {"ifaces": ["if1", "if2"]}, 40 "vnet3": {"ifaces": ["if2", "if3"]}, 41 "vnet4": {"ifaces": ["if3"]}, 42 "if1": {"prefixes6": [("2001:db8::2/64", "2001:db8::1/64")]}, 43 "if2": {"prefixes4": [("192.0.2.1/24", "192.0.2.2/24")]}, 44 "if3": {"prefixes4": [("198.51.100.1/24", "198.51.100.2/24")]} 45 } 46 47 def vnet4_handler(self, vnet): 48 ToolsHelper.print_output("/sbin/route add default 198.51.100.1") 49 50 def vnet3_handler(self, vnet): 51 ToolsHelper.print_output("/sbin/sysctl net.inet.ip.forwarding=1") 52 ToolsHelper.print_output("/sbin/sysctl net.inet.ip.ttl=62") 53 ToolsHelper.print_output("/sbin/sysctl net.inet.udp.checksum=0") 54 55 sel = selectors.DefaultSelector() 56 t = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 57 t.bind(("0.0.0.0", 1234)) 58 t.setblocking(False) 59 t.listen() 60 sel.register(t, selectors.EVENT_READ, data=None) 61 62 u = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 63 u.bind(("0.0.0.0", 4444)) 64 u.setblocking(False) 65 sel.register(u, selectors.EVENT_READ, data="UDP") 66 67 while True: 68 events = sel.select(timeout=20) 69 for key, mask in events: 70 sock = key.fileobj 71 if key.data is None: 72 conn, addr = sock.accept() 73 print(f"Accepted connection from {addr}") 74 data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"") 75 events = selectors.EVENT_READ | selectors.EVENT_WRITE 76 sel.register(conn, events, data=data) 77 elif key.data == "UDP": 78 recv_data, addr = sock.recvfrom(1024) 79 print(f"Received UDP {recv_data} from {addr}") 80 sock.sendto(b"foo", addr) 81 else: 82 if mask & selectors.EVENT_READ: 83 recv_data = sock.recv(1024) 84 print(f"Received TCP {recv_data}") 85 sock.send(b"foo") 86 else: 87 print("Unknown event?") 88 t.close() 89 u.close() 90 return 91 92 def vnet2_handler(self, vnet): 93 ifname = vnet.iface_alias_map["if1"].name 94 95 ToolsHelper.print_output("/sbin/route add default 192.0.2.2") 96 ToolsHelper.print_output("/sbin/pfctl -e") 97 ToolsHelper.pf_rules([ 98 "pass inet6 proto icmp6", 99 "pass in on %s inet6 af-to inet from 192.0.2.1" % ifname]) 100 101 @pytest.mark.require_user("root") 102 @pytest.mark.require_progs(["scapy"]) 103 def test_tcp_rst(self): 104 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 105 106 import scapy.all as sp 107 108 # Send a SYN 109 packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ 110 / sp.TCP(dport=1222, flags="S") 111 112 # Get a reply 113 reply = sp.sr1(packet) 114 115 # We expect to get a RST here. 116 tcp = reply.getlayer(sp.TCP) 117 assert tcp 118 assert "R" in tcp.flags 119 120 # Now try to SYN to an open port 121 packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ 122 / sp.TCP(dport=1234, flags="S") 123 reply = sp.sr1(packet) 124 125 tcp = reply.getlayer(sp.TCP) 126 assert tcp 127 128 # We don't get RST 129 assert "R" not in tcp.flags 130 131 # We do get SYN|ACK 132 assert "S" in tcp.flags 133 assert "A" in tcp.flags 134 135 @pytest.mark.require_user("root") 136 @pytest.mark.require_progs(["scapy"]) 137 def test_udp_port_closed(self): 138 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 139 140 import scapy.all as sp 141 142 packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ 143 / sp.UDP(dport=1222) / sp.Raw("bar") 144 reply = sp.sr1(packet, timeout=3) 145 print(reply.show()) 146 147 # We expect an ICMPv6 error, not a UDP reply 148 assert not reply.getlayer(sp.UDP) 149 icmp = reply.getlayer(sp.ICMPv6DestUnreach) 150 assert icmp 151 assert icmp.type == 1 152 assert icmp.code == 4 153 udp = reply.getlayer(sp.UDPerror) 154 assert udp 155 assert udp.dport == 1222 156 157 @pytest.mark.require_user("root") 158 @pytest.mark.require_progs(["scapy"]) 159 def test_address_unreachable(self): 160 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 161 162 import scapy.all as sp 163 164 packet = sp.IPv6(dst="64:ff9b::203.0.113.2") \ 165 / sp.UDP(dport=1222) / sp.Raw("bar") 166 reply = sp.sr1(packet, timeout=3) 167 print(reply.show()) 168 169 # We expect an ICMPv6 error, not a UDP reply 170 assert not reply.getlayer(sp.UDP) 171 icmp = reply.getlayer(sp.ICMPv6DestUnreach) 172 assert icmp 173 assert icmp.type == 1 174 assert icmp.code == 0 175 udp = reply.getlayer(sp.UDPerror) 176 assert udp 177 assert udp.dport == 1222 178 179 # Check the hop limit 180 ip6 = reply.getlayer(sp.IPv6) 181 assert ip6.hlim == 61 182 183 @pytest.mark.require_user("root") 184 @pytest.mark.require_progs(["scapy"]) 185 def test_udp_checksum(self): 186 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 187 188 import scapy.all as sp 189 190 # Send an outbound UDP packet to establish state 191 packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ 192 / sp.UDP(sport=3333, dport=4444) / sp.Raw("foo") 193 194 # Get a reply 195 # We'll send the reply without UDP checksum on the IPv4 side 196 # but that's not valid for IPv6, so expect pf to update the checksum. 197 reply = sp.sr1(packet, timeout=5) 198 199 udp = reply.getlayer(sp.UDP) 200 assert udp 201 assert udp.chksum != 0 202 203 def common_test_source_addr(self, packet): 204 vnet = self.vnet_map["vnet1"] 205 sendif = vnet.iface_alias_map["if1"].name 206 207 import scapy.all as sp 208 209 print("Outbound:\n") 210 packet.show() 211 212 s = DelayedSend(packet) 213 214 # We expect an ICMPv6 error here, where we'll verify the source address of 215 # the outer packet 216 packets = sp.sniff(iface=sendif, timeout=5) 217 218 for reply in packets: 219 print("Reply:\n") 220 reply.show() 221 icmp = reply.getlayer(sp.ICMPv6TimeExceeded) 222 if not icmp: 223 continue 224 225 ip = reply.getlayer(sp.IPv6) 226 assert icmp 227 assert ip.src == "64:ff9b::c000:202" 228 return reply 229 230 # If we don't find the packet we expect to see 231 assert False 232 233 @pytest.mark.require_user("root") 234 @pytest.mark.require_progs(["scapy"]) 235 def test_source_addr_tcp(self): 236 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 237 import scapy.all as sp 238 239 packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=2) \ 240 / sp.TCP(sport=1111, dport=2222, flags="S") 241 self.common_test_source_addr(packet) 242 243 @pytest.mark.require_user("root") 244 @pytest.mark.require_progs(["scapy"]) 245 def test_source_addr_udp(self): 246 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 247 import scapy.all as sp 248 249 packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=2) \ 250 / sp.UDP(sport=1111, dport=2222) / sp.Raw("foo") 251 self.common_test_source_addr(packet) 252 253 @pytest.mark.require_user("root") 254 @pytest.mark.require_progs(["scapy"]) 255 def test_source_addr_sctp(self): 256 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 257 import scapy.all as sp 258 259 packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=2) \ 260 / sp.SCTP(sport=1111, dport=2222) \ 261 / sp.SCTPChunkInit(init_tag=1, n_in_streams=1, n_out_streams=1, a_rwnd=1500) 262 self.common_test_source_addr(packet) 263 264 @pytest.mark.require_user("root") 265 @pytest.mark.require_progs(["scapy"]) 266 def test_source_addr_icmp(self): 267 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 268 import scapy.all as sp 269 270 packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=2) \ 271 / sp.ICMPv6EchoRequest() / sp.Raw("foo") 272 reply = self.common_test_source_addr(packet) 273 icmp = reply.getlayer(sp.ICMPv6EchoRequest) 274 assert icmp 275