xref: /freebsd/tests/sys/netpfil/pf/frag6.py (revision 6c05f3a74f30934ee60919cc97e16ec69b542b06)
1import pytest
2import logging
3import threading
4import time
5logging.getLogger("scapy").setLevel(logging.CRITICAL)
6from atf_python.sys.net.tools import ToolsHelper
7from atf_python.sys.net.vnet import VnetTestTemplate
8
9class DelayedSend(threading.Thread):
10    def __init__(self, packet):
11        threading.Thread.__init__(self)
12        self._packet = packet
13
14        self.start()
15
16    def run(self):
17        import scapy.all as sp
18        time.sleep(1)
19        sp.send(self._packet)
20
21class TestFrag6(VnetTestTemplate):
22    REQUIRED_MODULES = ["pf"]
23    TOPOLOGY = {
24        "vnet1": {"ifaces": ["if1"]},
25        "vnet2": {"ifaces": ["if1"]},
26        "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]},
27    }
28
29    def vnet2_handler(self, vnet):
30        ToolsHelper.print_output("/sbin/pfctl -e")
31        ToolsHelper.pf_rules([
32            "scrub fragment reassemble",
33            "pass",
34            "block in inet6 proto icmp6 icmp6-type echoreq",
35        ])
36
37    def check_ping_reply(self, packet):
38        print(packet)
39        return False
40
41    @pytest.mark.require_user("root")
42    def test_dup_frag_hdr(self):
43        "Test packets with duplicate fragment headers"
44        srv_vnet = self.vnet_map["vnet2"]
45
46        # Import in the correct vnet, so at to not confuse Scapy
47        import scapy.all as sp
48
49        packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \
50            / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \
51            / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \
52            / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 128))
53
54        # Delay the send so the sniffer is running when we transmit.
55        s = DelayedSend(packet)
56
57        packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name,
58            timeout=3)
59        for p in packets:
60            assert not p.getlayer(sp.ICMPv6EchoReply)
61
62class TestFrag6_Overlap(VnetTestTemplate):
63    REQUIRED_MODULES = ["pf"]
64    TOPOLOGY = {
65        "vnet1": {"ifaces": ["if1"]},
66        "vnet2": {"ifaces": ["if1"]},
67        "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]},
68    }
69
70    def vnet2_handler(self, vnet):
71        ToolsHelper.print_output("/sbin/pfctl -e")
72        ToolsHelper.print_output("/sbin/pfctl -x loud")
73        ToolsHelper.pf_rules([
74            "scrub fragment reassemble",
75            "pass",
76        ])
77
78    @pytest.mark.require_user("root")
79    def test_overlap(self):
80        "Ensure we discard packets with overlapping fragments"
81
82        # Import in the correct vnet, so at to not confuse Scapy
83        import scapy.all as sp
84
85        packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \
86            / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 90))
87        frags = sp.fragment6(packet, 128)
88        assert len(frags) == 3
89
90        f = frags[0].getlayer(sp.IPv6ExtHdrFragment)
91        # Fragment with overlap
92        overlap = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \
93            / sp.IPv6ExtHdrFragment(offset = 4, m = 1, id = f.id, nh = f.nh) \
94            / sp.raw(bytes.fromhex('f00f') * 4)
95        frags = [ frags[0], frags[1], overlap, frags[2] ]
96
97        # Delay the send so the sniffer is running when we transmit.
98        s = DelayedSend(frags)
99
100        packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name,
101            timeout=3)
102        for p in packets:
103            p.show()
104            assert not p.getlayer(sp.ICMPv6EchoReply)
105