1import pytest 2import logging 3import threading 4import time 5import random 6logging.getLogger("scapy").setLevel(logging.CRITICAL) 7from atf_python.sys.net.tools import ToolsHelper 8from atf_python.sys.net.vnet import VnetTestTemplate 9 10class DelayedSend(threading.Thread): 11 def __init__(self, packet): 12 threading.Thread.__init__(self) 13 self._packet = packet 14 15 self.start() 16 17 def run(self): 18 import scapy.all as sp 19 time.sleep(1) 20 sp.send(self._packet) 21 22class TestFrag6(VnetTestTemplate): 23 REQUIRED_MODULES = ["pf", "dummymbuf"] 24 TOPOLOGY = { 25 "vnet1": {"ifaces": ["if1"]}, 26 "vnet2": {"ifaces": ["if1"]}, 27 "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]}, 28 } 29 30 def vnet2_handler(self, vnet): 31 ifname = vnet.iface_alias_map["if1"].name 32 ToolsHelper.print_output("/sbin/pfctl -e") 33 ToolsHelper.pf_rules([ 34 "scrub fragment reassemble min-ttl 10", 35 "pass", 36 "block in inet6 proto icmp6 icmp6-type echoreq", 37 ]) 38 ToolsHelper.print_output("/sbin/pfilctl link -i dummymbuf:inet6 inet6") 39 ToolsHelper.print_output("/sbin/sysctl net.dummymbuf.rules=\"inet6 in %s enlarge 3000;\"" % ifname) 40 41 def check_ping_reply(self, packet): 42 print(packet) 43 return False 44 45 @pytest.mark.require_user("root") 46 @pytest.mark.require_progs(["scapy"]) 47 def test_dup_frag_hdr(self): 48 "Test packets with duplicate fragment headers" 49 srv_vnet = self.vnet_map["vnet2"] 50 51 # Import in the correct vnet, so at to not confuse Scapy 52 import scapy.all as sp 53 54 packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \ 55 / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \ 56 / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \ 57 / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 128)) 58 59 # Delay the send so the sniffer is running when we transmit. 60 s = DelayedSend(packet) 61 62 packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name, 63 timeout=3) 64 for p in packets: 65 assert not p.getlayer(sp.ICMPv6EchoReply) 66 67 @pytest.mark.require_user("root") 68 @pytest.mark.require_progs(["scapy"]) 69 def test_overlong(self): 70 "Test overly long fragmented packet" 71 72 # Import in the correct vnet, so at to not confuse Scapy 73 import scapy.all as sp 74 75 curr = 0 76 pkts = [] 77 78 frag_id = random.randint(0,0xffffffff) 79 gran = 1200 80 81 i = 0 82 while curr <= 65535: 83 ipv61 = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") 84 more = True 85 g = gran 86 if curr + gran > 65535: 87 more = False 88 g = 65530 - curr 89 if i == 0: 90 pkt = ipv61 / sp.IPv6ExtHdrHopByHop(options=[sp.PadN(optlen=2), sp.Pad1()]) / \ 91 sp.IPv6ExtHdrFragment(id = frag_id, offset = curr // 8, m = more) / bytes([i] * g) 92 else: 93 pkt = ipv61 / sp.IPv6ExtHdrFragment(id = frag_id, offset = curr // 8, m = more) / bytes([i] * g) 94 pkts.append(pkt) 95 curr += gran 96 i += 1 97 98 sp.send(pkts, inter = 0.1) 99 100class TestFrag6_Overlap(VnetTestTemplate): 101 REQUIRED_MODULES = ["pf"] 102 TOPOLOGY = { 103 "vnet1": {"ifaces": ["if1"]}, 104 "vnet2": {"ifaces": ["if1"]}, 105 "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]}, 106 } 107 108 def vnet2_handler(self, vnet): 109 ToolsHelper.print_output("/sbin/pfctl -e") 110 ToolsHelper.print_output("/sbin/pfctl -x loud") 111 ToolsHelper.pf_rules([ 112 "scrub fragment reassemble", 113 "pass", 114 ]) 115 116 @pytest.mark.require_user("root") 117 @pytest.mark.require_progs(["scapy"]) 118 def test_overlap(self): 119 "Ensure we discard packets with overlapping fragments" 120 121 # Import in the correct vnet, so at to not confuse Scapy 122 import scapy.all as sp 123 124 packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \ 125 / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 90)) 126 frags = sp.fragment6(packet, 128) 127 assert len(frags) == 3 128 129 f = frags[0].getlayer(sp.IPv6ExtHdrFragment) 130 # Fragment with overlap 131 overlap = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \ 132 / sp.IPv6ExtHdrFragment(offset = 4, m = 1, id = f.id, nh = f.nh) \ 133 / sp.raw(bytes.fromhex('f00f') * 4) 134 frags = [ frags[0], frags[1], overlap, frags[2] ] 135 136 # Delay the send so the sniffer is running when we transmit. 137 s = DelayedSend(frags) 138 139 packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name, 140 timeout=3) 141 for p in packets: 142 p.show() 143 assert not p.getlayer(sp.ICMPv6EchoReply) 144