1# 2# SPDX-License-Identifier: BSD-2-Clause 3# 4# Copyright (c) 2024 Rubicon Communications, LLC (Netgate) 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions 8# are met: 9# 1. Redistributions of source code must retain the above copyright 10# notice, this list of conditions and the following disclaimer. 11# 2. Redistributions in binary form must reproduce the above copyright 12# notice, this list of conditions and the following disclaimer in the 13# documentation and/or other materials provided with the distribution. 14# 15# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25# SUCH DAMAGE. 26 27import pytest 28import selectors 29import socket 30import sys 31import threading 32import time 33from atf_python.sys.net.tools import ToolsHelper 34from atf_python.sys.net.vnet import VnetTestTemplate 35 36class DelayedSend(threading.Thread): 37 def __init__(self, packet): 38 threading.Thread.__init__(self) 39 self._packet = packet 40 41 self.start() 42 43 def run(self): 44 import scapy.all as sp 45 time.sleep(1) 46 sp.send(self._packet) 47 48class TestNAT64(VnetTestTemplate): 49 REQUIRED_MODULES = [ "pf" ] 50 TOPOLOGY = { 51 "vnet1": {"ifaces": ["if1"]}, 52 "vnet2": {"ifaces": ["if1", "if2"]}, 53 "vnet3": {"ifaces": ["if2", "if3"]}, 54 "vnet4": {"ifaces": ["if3"]}, 55 "if1": {"prefixes6": [("2001:db8::2/64", "2001:db8::1/64")]}, 56 "if2": {"prefixes4": [("192.0.2.1/24", "192.0.2.2/24")]}, 57 "if3": {"prefixes4": [("198.51.100.1/24", "198.51.100.2/24")]} 58 } 59 60 def vnet4_handler(self, vnet): 61 ToolsHelper.print_output("/sbin/route add default 198.51.100.1") 62 63 def vnet3_handler(self, vnet): 64 ToolsHelper.print_output("/sbin/sysctl net.inet.ip.forwarding=1") 65 ToolsHelper.print_output("/sbin/sysctl net.inet.ip.ttl=62") 66 ToolsHelper.print_output("/sbin/sysctl net.inet.udp.checksum=0") 67 68 sel = selectors.DefaultSelector() 69 t = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 70 t.bind(("0.0.0.0", 1234)) 71 t.setblocking(False) 72 t.listen() 73 sel.register(t, selectors.EVENT_READ, data=None) 74 75 u = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 76 u.bind(("0.0.0.0", 4444)) 77 u.setblocking(False) 78 sel.register(u, selectors.EVENT_READ, data="UDP") 79 80 while True: 81 events = sel.select(timeout=20) 82 for key, mask in events: 83 sock = key.fileobj 84 if key.data is None: 85 conn, addr = sock.accept() 86 print(f"Accepted connection from {addr}") 87 data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"") 88 events = selectors.EVENT_READ | selectors.EVENT_WRITE 89 sel.register(conn, events, data=data) 90 elif key.data == "UDP": 91 recv_data, addr = sock.recvfrom(1024) 92 print(f"Received UDP {recv_data} from {addr}") 93 sock.sendto(b"foo", addr) 94 else: 95 if mask & selectors.EVENT_READ: 96 recv_data = sock.recv(1024) 97 print(f"Received TCP {recv_data}") 98 sock.send(b"foo") 99 else: 100 print("Unknown event?") 101 t.close() 102 u.close() 103 return 104 105 def vnet2_handler(self, vnet): 106 ifname = vnet.iface_alias_map["if1"].name 107 108 ToolsHelper.print_output("/sbin/route add default 192.0.2.2") 109 ToolsHelper.print_output("/sbin/pfctl -e") 110 ToolsHelper.pf_rules([ 111 "pass inet6 proto icmp6", 112 "pass in on %s inet6 af-to inet from 192.0.2.1" % ifname]) 113 114 @pytest.mark.require_user("root") 115 @pytest.mark.require_progs(["scapy"]) 116 def test_tcp_rst(self): 117 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 118 119 import scapy.all as sp 120 121 # Send a SYN 122 packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ 123 / sp.TCP(dport=1222, flags="S") 124 125 # Get a reply 126 reply = sp.sr1(packet) 127 128 # We expect to get a RST here. 129 tcp = reply.getlayer(sp.TCP) 130 assert tcp 131 assert "R" in tcp.flags 132 133 # Now try to SYN to an open port 134 packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ 135 / sp.TCP(dport=1234, flags="S") 136 reply = sp.sr1(packet) 137 138 tcp = reply.getlayer(sp.TCP) 139 assert tcp 140 141 # We don't get RST 142 assert "R" not in tcp.flags 143 144 # We do get SYN|ACK 145 assert "S" in tcp.flags 146 assert "A" in tcp.flags 147 148 @pytest.mark.require_user("root") 149 @pytest.mark.require_progs(["scapy"]) 150 def test_udp_port_closed(self): 151 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 152 153 import scapy.all as sp 154 155 packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ 156 / sp.UDP(dport=1222) / sp.Raw("bar") 157 reply = sp.sr1(packet, timeout=3) 158 print(reply.show()) 159 160 # We expect an ICMPv6 error, not a UDP reply 161 assert not reply.getlayer(sp.UDP) 162 icmp = reply.getlayer(sp.ICMPv6DestUnreach) 163 assert icmp 164 assert icmp.type == 1 165 assert icmp.code == 4 166 udp = reply.getlayer(sp.UDPerror) 167 assert udp 168 assert udp.dport == 1222 169 170 @pytest.mark.require_user("root") 171 @pytest.mark.require_progs(["scapy"]) 172 def test_address_unreachable(self): 173 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 174 175 import scapy.all as sp 176 177 packet = sp.IPv6(dst="64:ff9b::203.0.113.2") \ 178 / sp.UDP(dport=1222) / sp.Raw("bar") 179 reply = sp.sr1(packet, timeout=3) 180 print(reply.show()) 181 182 # We expect an ICMPv6 error, not a UDP reply 183 assert not reply.getlayer(sp.UDP) 184 icmp = reply.getlayer(sp.ICMPv6DestUnreach) 185 assert icmp 186 assert icmp.type == 1 187 assert icmp.code == 0 188 udp = reply.getlayer(sp.UDPerror) 189 assert udp 190 assert udp.dport == 1222 191 192 # Check the hop limit 193 ip6 = reply.getlayer(sp.IPv6) 194 assert ip6.hlim == 62 195 196 @pytest.mark.require_user("root") 197 @pytest.mark.require_progs(["scapy"]) 198 def test_udp_checksum(self): 199 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 200 201 import scapy.all as sp 202 203 # Send an outbound UDP packet to establish state 204 packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ 205 / sp.UDP(sport=3333, dport=4444) / sp.Raw("foo") 206 207 # Get a reply 208 # We'll send the reply without UDP checksum on the IPv4 side 209 # but that's not valid for IPv6, so expect pf to update the checksum. 210 reply = sp.sr1(packet, timeout=5) 211 212 udp = reply.getlayer(sp.UDP) 213 assert udp 214 assert udp.chksum != 0 215 216 def common_test_source_addr(self, packet): 217 vnet = self.vnet_map["vnet1"] 218 sendif = vnet.iface_alias_map["if1"].name 219 220 import scapy.all as sp 221 222 print("Outbound:\n") 223 packet.show() 224 225 s = DelayedSend(packet) 226 227 # We expect an ICMPv6 error here, where we'll verify the source address of 228 # the outer packet 229 packets = sp.sniff(iface=sendif, timeout=5) 230 231 for reply in packets: 232 print("Reply:\n") 233 reply.show() 234 icmp = reply.getlayer(sp.ICMPv6TimeExceeded) 235 if not icmp: 236 continue 237 238 ip = reply.getlayer(sp.IPv6) 239 assert icmp 240 assert ip.src == "64:ff9b::c000:202" 241 return reply 242 243 # If we don't find the packet we expect to see 244 assert False 245 246 @pytest.mark.require_user("root") 247 @pytest.mark.require_progs(["scapy"]) 248 def test_source_addr_tcp(self): 249 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 250 import scapy.all as sp 251 252 packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=1) \ 253 / sp.TCP(sport=1111, dport=2222, flags="S") 254 self.common_test_source_addr(packet) 255 256 @pytest.mark.require_user("root") 257 @pytest.mark.require_progs(["scapy"]) 258 def test_source_addr_udp(self): 259 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 260 import scapy.all as sp 261 262 packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=1) \ 263 / sp.UDP(sport=1111, dport=2222) / sp.Raw("foo") 264 self.common_test_source_addr(packet) 265 266 @pytest.mark.require_user("root") 267 @pytest.mark.require_progs(["scapy"]) 268 def test_source_addr_sctp(self): 269 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 270 import scapy.all as sp 271 272 packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=1) \ 273 / sp.SCTP(sport=1111, dport=2222) \ 274 / sp.SCTPChunkInit(init_tag=1, n_in_streams=1, n_out_streams=1, a_rwnd=1500) 275 self.common_test_source_addr(packet) 276 277 @pytest.mark.require_user("root") 278 @pytest.mark.require_progs(["scapy"]) 279 def test_source_addr_icmp(self): 280 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 281 import scapy.all as sp 282 283 packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=1) \ 284 / sp.ICMPv6EchoRequest() / sp.Raw("foo") 285 reply = self.common_test_source_addr(packet) 286 icmp = reply.getlayer(sp.ICMPv6EchoRequest) 287 assert icmp 288