1import pytest 2import logging 3import threading 4import time 5logging.getLogger("scapy").setLevel(logging.CRITICAL) 6from atf_python.sys.net.tools import ToolsHelper 7from atf_python.sys.net.vnet import VnetTestTemplate 8 9class DelayedSend(threading.Thread): 10 def __init__(self, packet): 11 threading.Thread.__init__(self) 12 self._packet = packet 13 14 self.start() 15 16 def run(self): 17 import scapy.all as sp 18 time.sleep(1) 19 sp.send(self._packet) 20 21class TestFrag6(VnetTestTemplate): 22 REQUIRED_MODULES = ["pf"] 23 TOPOLOGY = { 24 "vnet1": {"ifaces": ["if1"]}, 25 "vnet2": {"ifaces": ["if1"]}, 26 "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]}, 27 } 28 29 def vnet2_handler(self, vnet): 30 ToolsHelper.print_output("/sbin/pfctl -e") 31 ToolsHelper.pf_rules([ 32 "scrub fragment reassemble", 33 "pass", 34 "block in inet6 proto icmp6 icmp6-type echoreq", 35 ]) 36 37 def check_ping_reply(self, packet): 38 print(packet) 39 return False 40 41 @pytest.mark.require_user("root") 42 def test_dup_frag_hdr(self): 43 "Test packets with duplicate fragment headers" 44 srv_vnet = self.vnet_map["vnet2"] 45 46 # Import in the correct vnet, so at to not confuse Scapy 47 import scapy.all as sp 48 49 packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \ 50 / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \ 51 / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \ 52 / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 128)) 53 54 # Delay the send so the sniffer is running when we transmit. 55 s = DelayedSend(packet) 56 57 packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name, 58 timeout=3) 59 for p in packets: 60 assert not p.getlayer(sp.ICMPv6EchoReply) 61 62class TestFrag6_Overlap(VnetTestTemplate): 63 REQUIRED_MODULES = ["pf"] 64 TOPOLOGY = { 65 "vnet1": {"ifaces": ["if1"]}, 66 "vnet2": {"ifaces": ["if1"]}, 67 "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]}, 68 } 69 70 def vnet2_handler(self, vnet): 71 ToolsHelper.print_output("/sbin/pfctl -e") 72 ToolsHelper.print_output("/sbin/pfctl -x loud") 73 ToolsHelper.pf_rules([ 74 "scrub fragment reassemble", 75 "pass", 76 ]) 77 78 @pytest.mark.require_user("root") 79 def test_overlap(self): 80 "Ensure we discard packets with overlapping fragments" 81 82 # Import in the correct vnet, so at to not confuse Scapy 83 import scapy.all as sp 84 85 packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \ 86 / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 90)) 87 frags = sp.fragment6(packet, 128) 88 assert len(frags) == 3 89 90 f = frags[0].getlayer(sp.IPv6ExtHdrFragment) 91 # Fragment with overlap 92 overlap = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \ 93 / sp.IPv6ExtHdrFragment(offset = 4, m = 1, id = f.id, nh = f.nh) \ 94 / sp.raw(bytes.fromhex('f00f') * 4) 95 frags = [ frags[0], frags[1], overlap, frags[2] ] 96 97 # Delay the send so the sniffer is running when we transmit. 98 s = DelayedSend(frags) 99 100 packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name, 101 timeout=3) 102 for p in packets: 103 p.show() 104 assert not p.getlayer(sp.ICMPv6EchoReply) 105