1import pytest 2import logging 3import threading 4import time 5logging.getLogger("scapy").setLevel(logging.CRITICAL) 6from atf_python.sys.net.tools import ToolsHelper 7from atf_python.sys.net.vnet import VnetTestTemplate 8 9class DelayedSend(threading.Thread): 10 def __init__(self, packet): 11 threading.Thread.__init__(self) 12 self._packet = packet 13 14 self.start() 15 16 def run(self): 17 import scapy.all as sp 18 time.sleep(1) 19 sp.send(self._packet) 20 21class TestFrag6(VnetTestTemplate): 22 REQUIRED_MODULES = ["pf"] 23 TOPOLOGY = { 24 "vnet1": {"ifaces": ["if1"]}, 25 "vnet2": {"ifaces": ["if1"]}, 26 "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]}, 27 } 28 29 def vnet2_handler(self, vnet): 30 ToolsHelper.print_output("/sbin/pfctl -e") 31 ToolsHelper.pf_rules([ 32 "scrub fragment reassemble", 33 "pass", 34 "block in inet6 proto icmp6 icmp6-type echoreq", 35 ]) 36 37 def check_ping_reply(self, packet): 38 print(packet) 39 return False 40 41 @pytest.mark.require_user("root") 42 def test_dup_frag_hdr(self): 43 "Test packets with duplicate fragment headers" 44 srv_vnet = self.vnet_map["vnet2"] 45 46 # Import in the correct vnet, so at to not confuse Scapy 47 import scapy.all as sp 48 49 packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \ 50 / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \ 51 / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \ 52 / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 128)) 53 54 # Delay the send so the sniffer is running when we transmit. 55 s = DelayedSend(packet) 56 57 packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name, 58 timeout=3) 59 for p in packets: 60 assert not p.getlayer(sp.ICMPv6EchoReply) 61