1# 2# SPDX-License-Identifier: BSD-2-Clause 3# 4# Copyright (c) 2024 Rubicon Communications, LLC (Netgate) 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions 8# are met: 9# 1. Redistributions of source code must retain the above copyright 10# notice, this list of conditions and the following disclaimer. 11# 2. Redistributions in binary form must reproduce the above copyright 12# notice, this list of conditions and the following disclaimer in the 13# documentation and/or other materials provided with the distribution. 14# 15# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25# SUCH DAMAGE. 26 27import pytest 28import selectors 29import socket 30import sys 31from utils import DelayedSend 32from atf_python.sys.net.tools import ToolsHelper 33from atf_python.sys.net.vnet import VnetTestTemplate 34 35class TestNAT64(VnetTestTemplate): 36 REQUIRED_MODULES = [ "pf", "pflog" ] 37 TOPOLOGY = { 38 "vnet1": {"ifaces": ["if1"]}, 39 "vnet2": {"ifaces": ["if1", "if2"]}, 40 "vnet3": {"ifaces": ["if2", "if3"]}, 41 "vnet4": {"ifaces": ["if3"]}, 42 "if1": {"prefixes6": [("2001:db8::2/64", "2001:db8::1/64")]}, 43 "if2": {"prefixes4": [("192.0.2.1/24", "192.0.2.2/24")]}, 44 "if3": {"prefixes4": [("198.51.100.1/24", "198.51.100.2/24")]} 45 } 46 47 def vnet4_handler(self, vnet): 48 ToolsHelper.print_output("/sbin/route add default 198.51.100.1") 49 50 def vnet3_handler(self, vnet): 51 ToolsHelper.print_output("/sbin/sysctl net.inet.ip.forwarding=1") 52 ToolsHelper.print_output("/sbin/sysctl net.inet.ip.ttl=62") 53 ToolsHelper.print_output("/sbin/sysctl net.inet.udp.checksum=0") 54 55 sel = selectors.DefaultSelector() 56 t = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 57 t.bind(("0.0.0.0", 1234)) 58 t.setblocking(False) 59 t.listen() 60 sel.register(t, selectors.EVENT_READ, data=None) 61 62 u = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 63 u.bind(("0.0.0.0", 4444)) 64 u.setblocking(False) 65 sel.register(u, selectors.EVENT_READ, data="UDP") 66 67 while True: 68 events = sel.select(timeout=20) 69 for key, mask in events: 70 sock = key.fileobj 71 if key.data is None: 72 conn, addr = sock.accept() 73 print(f"Accepted connection from {addr}") 74 data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"") 75 events = selectors.EVENT_READ | selectors.EVENT_WRITE 76 sel.register(conn, events, data=data) 77 elif key.data == "UDP": 78 recv_data, addr = sock.recvfrom(1024) 79 print(f"Received UDP {recv_data} from {addr}") 80 sock.sendto(b"foo", addr) 81 else: 82 if mask & selectors.EVENT_READ: 83 recv_data = sock.recv(1024) 84 print(f"Received TCP {recv_data}") 85 sock.send(b"foo") 86 else: 87 print("Unknown event?") 88 t.close() 89 u.close() 90 return 91 92 def vnet2_handler(self, vnet): 93 ifname = vnet.iface_alias_map["if1"].name 94 95 ToolsHelper.print_output("/sbin/sysctl net.inet6.ip6.forwarding=1") 96 ToolsHelper.print_output("/sbin/route add default 192.0.2.2") 97 ToolsHelper.print_output("/sbin/pfctl -e") 98 ToolsHelper.pf_rules([ 99 "block", 100 "pass inet6 proto icmp6 icmp6-type { neighbrsol, neighbradv }", 101 "pass in on %s inet6 af-to inet from 192.0.2.1" % ifname, 102 ]) 103 104 vnet.pipe.send(socket.if_nametoindex("pflog0")) 105 106 @pytest.mark.require_user("root") 107 @pytest.mark.require_progs(["scapy"]) 108 def test_tcp_rst(self): 109 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 110 111 import scapy.all as sp 112 113 # Send a SYN 114 packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ 115 / sp.TCP(dport=1222, flags="S") 116 117 # Get a reply 118 reply = sp.sr1(packet) 119 120 # We expect to get a RST here. 121 tcp = reply.getlayer(sp.TCP) 122 assert tcp 123 assert "R" in tcp.flags 124 125 # Now try to SYN to an open port 126 packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ 127 / sp.TCP(dport=1234, flags="S") 128 reply = sp.sr1(packet) 129 130 tcp = reply.getlayer(sp.TCP) 131 assert tcp 132 133 # We don't get RST 134 assert "R" not in tcp.flags 135 136 # We do get SYN|ACK 137 assert "S" in tcp.flags 138 assert "A" in tcp.flags 139 140 @pytest.mark.require_user("root") 141 @pytest.mark.require_progs(["scapy"]) 142 def test_udp_port_closed(self): 143 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 144 145 import scapy.all as sp 146 147 packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ 148 / sp.UDP(dport=1222) / sp.Raw("bar") 149 reply = sp.sr1(packet, timeout=3) 150 print(reply.show()) 151 152 # We expect an ICMPv6 error, not a UDP reply 153 assert not reply.getlayer(sp.UDP) 154 icmp = reply.getlayer(sp.ICMPv6DestUnreach) 155 assert icmp 156 assert icmp.type == 1 157 assert icmp.code == 4 158 udp = reply.getlayer(sp.UDPerror) 159 assert udp 160 assert udp.dport == 1222 161 162 @pytest.mark.require_user("root") 163 @pytest.mark.require_progs(["scapy"]) 164 def test_address_unreachable(self): 165 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 166 167 import scapy.all as sp 168 169 packet = sp.IPv6(dst="64:ff9b::203.0.113.2") \ 170 / sp.UDP(dport=1222) / sp.Raw("bar") 171 reply = sp.sr1(packet, timeout=3) 172 print(reply.show()) 173 174 # We expect an ICMPv6 error, not a UDP reply 175 assert not reply.getlayer(sp.UDP) 176 icmp = reply.getlayer(sp.ICMPv6DestUnreach) 177 assert icmp 178 assert icmp.type == 1 179 assert icmp.code == 0 180 udp = reply.getlayer(sp.UDPerror) 181 assert udp 182 assert udp.dport == 1222 183 184 # Check the hop limit 185 ip6 = reply.getlayer(sp.IPv6) 186 assert ip6.hlim == 61 187 188 @pytest.mark.require_user("root") 189 @pytest.mark.require_progs(["scapy"]) 190 def test_udp_checksum(self): 191 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 192 193 import scapy.all as sp 194 195 # Send an outbound UDP packet to establish state 196 packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ 197 / sp.UDP(sport=3333, dport=4444) / sp.Raw("foo") 198 199 # Get a reply 200 # We'll send the reply without UDP checksum on the IPv4 side 201 # but that's not valid for IPv6, so expect pf to update the checksum. 202 reply = sp.sr1(packet, timeout=5) 203 204 udp = reply.getlayer(sp.UDP) 205 assert udp 206 assert udp.chksum != 0 207 208 def common_test_source_addr(self, packet): 209 vnet = self.vnet_map["vnet1"] 210 sendif = vnet.iface_alias_map["if1"].name 211 212 import scapy.all as sp 213 214 print("Outbound:\n") 215 packet.show() 216 217 s = DelayedSend(packet) 218 219 # We expect an ICMPv6 error here, where we'll verify the source address of 220 # the outer packet 221 packets = sp.sniff(iface=sendif, timeout=5) 222 223 for reply in packets: 224 print("Reply:\n") 225 reply.show() 226 icmp = reply.getlayer(sp.ICMPv6TimeExceeded) 227 if not icmp: 228 continue 229 230 ip = reply.getlayer(sp.IPv6) 231 assert icmp 232 assert ip.src == "64:ff9b::c000:202" 233 return reply 234 235 # If we don't find the packet we expect to see 236 assert False 237 238 @pytest.mark.require_user("root") 239 @pytest.mark.require_progs(["scapy"]) 240 def test_source_addr_tcp(self): 241 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 242 import scapy.all as sp 243 244 packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=2) \ 245 / sp.TCP(sport=1111, dport=2222, flags="S") 246 self.common_test_source_addr(packet) 247 248 @pytest.mark.require_user("root") 249 @pytest.mark.require_progs(["scapy"]) 250 def test_source_addr_udp(self): 251 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 252 import scapy.all as sp 253 254 packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=2) \ 255 / sp.UDP(sport=1111, dport=2222) / sp.Raw("foo") 256 self.common_test_source_addr(packet) 257 258 @pytest.mark.require_user("root") 259 @pytest.mark.require_progs(["scapy"]) 260 def test_source_addr_sctp(self): 261 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 262 import scapy.all as sp 263 264 packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=2) \ 265 / sp.SCTP(sport=1111, dport=2222) \ 266 / sp.SCTPChunkInit(init_tag=1, n_in_streams=1, n_out_streams=1, a_rwnd=1500) 267 self.common_test_source_addr(packet) 268 269 @pytest.mark.require_user("root") 270 @pytest.mark.require_progs(["scapy"]) 271 def test_source_addr_icmp(self): 272 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 273 import scapy.all as sp 274 275 packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=2) \ 276 / sp.ICMPv6EchoRequest() / sp.Raw("foo") 277 reply = self.common_test_source_addr(packet) 278 icmp = reply.getlayer(sp.ICMPv6EchoRequest) 279 assert icmp 280 281 @pytest.mark.require_user("root") 282 @pytest.mark.require_progs(["scapy"]) 283 def test_bad_len(self): 284 """ 285 PR 288224: we can panic if the IPv6 plen is longer than the packet length. 286 """ 287 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 288 import scapy.all as sp 289 290 packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=2, plen=512) \ 291 / sp.ICMPv6EchoRequest() / sp.Raw("foo") 292 reply = sp.sr1(packet, timeout=3) 293 # We don't expect a reply to a corrupted packet 294 assert not reply 295 296 @pytest.mark.require_user("root") 297 @pytest.mark.require_progs(["scapy"]) 298 def test_noip6(self): 299 """ 300 PR 288263: link-local target address in icmp6 ADVERT can cause NULL deref 301 """ 302 ifname = self.vnet.iface_alias_map["if1"].name 303 gw_mac = self.vnet.iface_alias_map["if1"].epairb.ether 304 scopeid = self.wait_object(self.vnet_map["vnet2"].pipe) 305 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 306 307 import scapy.all as sp 308 309 pkt = sp.Ether(dst=gw_mac) \ 310 / sp.IPv6(dst="64:ff9b::203.0.113.2") \ 311 / sp.ICMPv6ND_NA(tgt="FFA2:%x:2821:125F:1D27:B3B2:3F6F:C43C" % scopeid) 312 pkt.show() 313 sp.hexdump(pkt) 314 s = DelayedSend(pkt, sendif=ifname) 315 316 packets = sp.sniff(iface=ifname, timeout=5) 317 for r in packets: 318 r.show() 319 320 # Try scope id that likely doesn't have an interface at all 321 pkt = sp.Ether(dst=gw_mac) \ 322 / sp.IPv6(dst="64:ff9b::203.0.113.2") \ 323 / sp.ICMPv6ND_NA(tgt="FFA2:%x:2821:125F:1D27:B3B2:3F6F:C43C" % 255) 324 pkt.show() 325 sp.hexdump(pkt) 326 s = DelayedSend(pkt, sendif=ifname) 327 328 packets = sp.sniff(iface=ifname, timeout=5) 329 for r in packets: 330 r.show() 331 332 @pytest.mark.require_user("root") 333 @pytest.mark.require_progs(["scapy"]) 334 def test_ttl_zero(self): 335 """ 336 PR 288274: we can use an mbuf after free on TTL = 0 337 """ 338 ifname = self.vnet.iface_alias_map["if1"].name 339 gw_mac = self.vnet.iface_alias_map["if1"].epairb.ether 340 ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") 341 342 import scapy.all as sp 343 344 pkt = sp.Ether(dst=gw_mac) \ 345 / sp.IPv6(dst="64:ff9b::192.0.2.2", hlim=0) \ 346 / sp.SCTP(sport=1111, dport=2222) \ 347 / sp.SCTPChunkInit(init_tag=1, n_in_streams=1, n_out_streams=1, \ 348 a_rwnd=1500, params=[ \ 349 sp.SCTPChunkParamIPv4Addr() \ 350 ]) 351 pkt.show() 352 sp.hexdump(pkt) 353 s = DelayedSend(pkt, sendif=ifname) 354 355 packets = sp.sniff(iface=ifname, timeout=5) 356 for r in packets: 357 r.show() 358 359