1import pytest 2import logging 3import random 4logging.getLogger("scapy").setLevel(logging.CRITICAL) 5from utils import DelayedSend 6from atf_python.sys.net.tools import ToolsHelper 7from atf_python.sys.net.vnet import VnetTestTemplate 8 9class TestFrag6(VnetTestTemplate): 10 REQUIRED_MODULES = ["pf", "dummymbuf"] 11 TOPOLOGY = { 12 "vnet1": {"ifaces": ["if1"]}, 13 "vnet2": {"ifaces": ["if1"]}, 14 "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]}, 15 } 16 17 def vnet2_handler(self, vnet): 18 ifname = vnet.iface_alias_map["if1"].name 19 ToolsHelper.print_output("/sbin/pfctl -e") 20 ToolsHelper.pf_rules([ 21 "scrub fragment reassemble min-ttl 10", 22 "pass", 23 "block in inet6 proto icmp6 icmp6-type echoreq", 24 ]) 25 ToolsHelper.print_output("/sbin/pfilctl link -i dummymbuf:inet6 inet6") 26 ToolsHelper.print_output("/sbin/sysctl net.dummymbuf.rules=\"inet6 in %s enlarge 3000;\"" % ifname) 27 28 def check_ping_reply(self, packet): 29 print(packet) 30 return False 31 32 @pytest.mark.require_user("root") 33 @pytest.mark.require_progs(["scapy"]) 34 def test_dup_frag_hdr(self): 35 "Test packets with duplicate fragment headers" 36 srv_vnet = self.vnet_map["vnet2"] 37 38 # Import in the correct vnet, so at to not confuse Scapy 39 import scapy.all as sp 40 41 packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \ 42 / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \ 43 / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \ 44 / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 128)) 45 46 # Delay the send so the sniffer is running when we transmit. 47 s = DelayedSend(packet) 48 49 packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name, 50 timeout=3) 51 for p in packets: 52 assert not p.getlayer(sp.ICMPv6EchoReply) 53 54 @pytest.mark.require_user("root") 55 @pytest.mark.require_progs(["scapy"]) 56 def test_overlong(self): 57 "Test overly long fragmented packet" 58 59 # Import in the correct vnet, so at to not confuse Scapy 60 import scapy.all as sp 61 62 curr = 0 63 pkts = [] 64 65 frag_id = random.randint(0,0xffffffff) 66 gran = 1200 67 68 i = 0 69 while curr <= 65535: 70 ipv61 = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") 71 more = True 72 g = gran 73 if curr + gran > 65535: 74 more = False 75 g = 65530 - curr 76 if i == 0: 77 pkt = ipv61 / sp.IPv6ExtHdrHopByHop(options=[sp.PadN(optlen=2), sp.Pad1()]) / \ 78 sp.IPv6ExtHdrFragment(id = frag_id, offset = curr // 8, m = more) / bytes([i] * g) 79 else: 80 pkt = ipv61 / sp.IPv6ExtHdrFragment(id = frag_id, offset = curr // 8, m = more) / bytes([i] * g) 81 pkts.append(pkt) 82 curr += gran 83 i += 1 84 85 sp.send(pkts, inter = 0.1) 86 87class TestFrag6HopHyHop(VnetTestTemplate): 88 REQUIRED_MODULES = ["pf"] 89 TOPOLOGY = { 90 "vnet1": {"ifaces": ["if1", "if2"]}, 91 "vnet2": {"ifaces": ["if1", "if2"]}, 92 "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]}, 93 "if2": {"prefixes6": [("2001:db8:666::1/64", "2001:db8:1::2/64")]}, 94 } 95 96 def vnet2_handler(self, vnet): 97 ifname = vnet.iface_alias_map["if1"].name 98 ToolsHelper.print_output("/sbin/sysctl net.inet6.ip6.forwarding=1") 99 ToolsHelper.print_output("/usr/sbin/ndp -s 2001:db8:1::1 00:01:02:03:04:05") 100 ToolsHelper.print_output("/sbin/pfctl -e") 101 ToolsHelper.print_output("/sbin/pfctl -x loud") 102 ToolsHelper.pf_rules([ 103 "scrub fragment reassemble min-ttl 10", 104 "pass allow-opts", 105 ]) 106 107 @pytest.mark.require_user("root") 108 @pytest.mark.require_progs(["scapy"]) 109 def test_hop_by_hop(self): 110 "Verify that we reject non-first hop-by-hop headers" 111 if1 = self.vnet.iface_alias_map["if1"].name 112 if2 = self.vnet.iface_alias_map["if2"].name 113 ToolsHelper.print_output("/sbin/route add -6 default 2001:db8::2") 114 ToolsHelper.print_output("/sbin/ping6 -c 1 2001:db8:1::2") 115 116 # Import in the correct vnet, so at to not confuse Scapy 117 import scapy.all as sp 118 119 # A hop-by-hop header is accepted if it's the first header 120 pkt = sp.IPv6(src="2001:db8::1", dst="2001:db8:1::1") \ 121 / sp.IPv6ExtHdrHopByHop() \ 122 / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f0') * 30)) 123 pkt.show() 124 125 # Delay the send so the sniffer is running when we transmit. 126 s = DelayedSend(pkt) 127 128 replies = sp.sniff(iface=if2, timeout=3) 129 found = False 130 for p in replies: 131 p.show() 132 ip6 = p.getlayer(sp.IPv6) 133 hbh = p.getlayer(sp.IPv6ExtHdrHopByHop) 134 icmp6 = p.getlayer(sp.ICMPv6EchoRequest) 135 136 if not ip6 or not icmp6: 137 continue 138 assert ip6.src == "2001:db8::1" 139 assert ip6.dst == "2001:db8:1::1" 140 assert hbh 141 assert icmp6 142 found = True 143 assert found 144 145 # A hop-by-hop header causes the packet to be dropped if it's not the 146 # first extension header 147 pkt = sp.IPv6(src="2001:db8::1", dst="2001:db8:1::1") \ 148 / sp.IPv6ExtHdrFragment(offset=0, m=0) \ 149 / sp.IPv6ExtHdrHopByHop() \ 150 / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f0') * 30)) 151 pkt2 = sp.IPv6(src="2001:db8::1", dst="2001:db8:1::1") \ 152 / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f0') * 30)) 153 154 # Delay the send so the sniffer is running when we transmit. 155 ToolsHelper.print_output("/sbin/ping6 -c 1 2001:db8:1::2") 156 157 s = DelayedSend([ pkt2, pkt ]) 158 replies = sp.sniff(iface=if2, timeout=10) 159 found = False 160 for p in replies: 161 # Expect to find the packet without the hop-by-hop header, not the 162 # one with 163 p.show() 164 ip6 = p.getlayer(sp.IPv6) 165 hbh = p.getlayer(sp.IPv6ExtHdrHopByHop) 166 icmp6 = p.getlayer(sp.ICMPv6EchoRequest) 167 168 if not ip6 or not icmp6: 169 continue 170 assert ip6.src == "2001:db8::1" 171 assert ip6.dst == "2001:db8:1::1" 172 assert not hbh 173 assert icmp6 174 found = True 175 assert found 176 177class TestFrag6_Overlap(VnetTestTemplate): 178 REQUIRED_MODULES = ["pf"] 179 TOPOLOGY = { 180 "vnet1": {"ifaces": ["if1"]}, 181 "vnet2": {"ifaces": ["if1"]}, 182 "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]}, 183 } 184 185 def vnet2_handler(self, vnet): 186 ToolsHelper.print_output("/sbin/pfctl -e") 187 ToolsHelper.print_output("/sbin/pfctl -x loud") 188 ToolsHelper.pf_rules([ 189 "scrub fragment reassemble", 190 "pass", 191 ]) 192 193 @pytest.mark.require_user("root") 194 @pytest.mark.require_progs(["scapy"]) 195 def test_overlap(self): 196 "Ensure we discard packets with overlapping fragments" 197 198 # Import in the correct vnet, so at to not confuse Scapy 199 import scapy.all as sp 200 201 packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \ 202 / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 90)) 203 frags = sp.fragment6(packet, 128) 204 assert len(frags) == 3 205 206 f = frags[0].getlayer(sp.IPv6ExtHdrFragment) 207 # Fragment with overlap 208 overlap = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \ 209 / sp.IPv6ExtHdrFragment(offset = 4, m = 1, id = f.id, nh = f.nh) \ 210 / sp.raw(bytes.fromhex('f00f') * 4) 211 frags = [ frags[0], frags[1], overlap, frags[2] ] 212 213 # Delay the send so the sniffer is running when we transmit. 214 s = DelayedSend(frags) 215 216 packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name, 217 timeout=3) 218 for p in packets: 219 p.show() 220 assert not p.getlayer(sp.ICMPv6EchoReply) 221 222class TestFrag6_RouteTo(VnetTestTemplate): 223 REQUIRED_MODULES = ["pf"] 224 TOPOLOGY = { 225 "vnet1": {"ifaces": ["if1"]}, 226 "vnet2": {"ifaces": ["if1", "if2"]}, 227 "vnet3": {"ifaces": ["if2"]}, 228 "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]}, 229 "if2": {"prefixes6": [("2001:db8:1::1/64", "2001:db8:1::2/64")]}, 230 } 231 232 def vnet2_handler(self, vnet): 233 if2name = vnet.iface_alias_map["if2"].name 234 ToolsHelper.print_output("/sbin/pfctl -e") 235 ToolsHelper.print_output("/sbin/pfctl -x loud") 236 ToolsHelper.pf_rules([ 237 "scrub fragment reassemble", 238 "pass in route-to (%s 2001:db8:1::2) from 2001:db8::1 to 2001:db8:666::1" % if2name, 239 ]) 240 241 ToolsHelper.print_output("/sbin/ifconfig %s mtu 1300" % if2name) 242 ToolsHelper.print_output("/sbin/sysctl net.inet6.ip6.forwarding=1") 243 244 def vnet3_handler(self, vnet): 245 pass 246 247 @pytest.mark.require_user("root") 248 @pytest.mark.require_progs(["scapy"]) 249 def test_too_big(self): 250 ToolsHelper.print_output("/sbin/route add -6 default 2001:db8::2") 251 252 # Import in the correct vnet, so at to not confuse Scapy 253 import scapy.all as sp 254 255 pkt = sp.IPv6(dst="2001:db8:666::1") \ 256 / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f0') * 3000)) 257 frags = sp.fragment6(pkt, 1320) 258 259 reply = sp.sr1(frags, timeout=3) 260 if reply: 261 reply.show() 262 263 assert reply 264 265 ip6 = reply.getlayer(sp.IPv6) 266 icmp6 = reply.getlayer(sp.ICMPv6PacketTooBig) 267 err_ip6 = reply.getlayer(sp.IPerror6) 268 269 assert ip6 270 assert ip6.src == "2001:db8::2" 271 assert ip6.dst == "2001:db8::1" 272 assert icmp6 273 assert icmp6.mtu == 1300 274 assert err_ip6 275 assert err_ip6.src == "2001:db8::1" 276 assert err_ip6.dst == "2001:db8:666::1" 277