xref: /freebsd/tests/sys/netpfil/pf/frag6.py (revision 65cc5af1cf88ed124ab16091624e918faa61c7f2)
1import pytest
2import logging
3import threading
4import time
5import random
6logging.getLogger("scapy").setLevel(logging.CRITICAL)
7from atf_python.sys.net.tools import ToolsHelper
8from atf_python.sys.net.vnet import VnetTestTemplate
9
10class DelayedSend(threading.Thread):
11    def __init__(self, packet):
12        threading.Thread.__init__(self)
13        self._packet = packet
14
15        self.start()
16
17    def run(self):
18        import scapy.all as sp
19        time.sleep(1)
20        sp.send(self._packet)
21
22class TestFrag6(VnetTestTemplate):
23    REQUIRED_MODULES = ["pf", "dummymbuf"]
24    TOPOLOGY = {
25        "vnet1": {"ifaces": ["if1"]},
26        "vnet2": {"ifaces": ["if1"]},
27        "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]},
28    }
29
30    def vnet2_handler(self, vnet):
31        ifname = vnet.iface_alias_map["if1"].name
32        ToolsHelper.print_output("/sbin/pfctl -e")
33        ToolsHelper.pf_rules([
34            "scrub fragment reassemble min-ttl 10",
35            "pass",
36            "block in inet6 proto icmp6 icmp6-type echoreq",
37        ])
38        ToolsHelper.print_output("/sbin/pfilctl link -i dummymbuf:inet6 inet6")
39        ToolsHelper.print_output("/sbin/sysctl net.dummymbuf.rules=\"inet6 in %s enlarge 3000;\"" % ifname)
40
41    def check_ping_reply(self, packet):
42        print(packet)
43        return False
44
45    @pytest.mark.require_user("root")
46    @pytest.mark.require_progs(["scapy"])
47    def test_dup_frag_hdr(self):
48        "Test packets with duplicate fragment headers"
49        srv_vnet = self.vnet_map["vnet2"]
50
51        # Import in the correct vnet, so at to not confuse Scapy
52        import scapy.all as sp
53
54        packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \
55            / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \
56            / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \
57            / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 128))
58
59        # Delay the send so the sniffer is running when we transmit.
60        s = DelayedSend(packet)
61
62        packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name,
63            timeout=3)
64        for p in packets:
65            assert not p.getlayer(sp.ICMPv6EchoReply)
66
67    @pytest.mark.require_user("root")
68    @pytest.mark.require_progs(["scapy"])
69    def test_overlong(self):
70        "Test overly long fragmented packet"
71
72        # Import in the correct vnet, so at to not confuse Scapy
73        import scapy.all as sp
74
75        curr = 0
76        pkts = []
77
78        frag_id = random.randint(0,0xffffffff)
79        gran = 1200
80
81        i = 0
82        while curr <= 65535:
83            ipv61 = sp.IPv6(src="2001:db8::1", dst="2001:db8::2")
84            more = True
85            g = gran
86            if curr + gran > 65535:
87                more = False
88                g = 65530 - curr
89            if i == 0:
90                pkt = ipv61 / sp.IPv6ExtHdrHopByHop(options=[sp.PadN(optlen=2), sp.Pad1()]) / \
91                    sp.IPv6ExtHdrFragment(id = frag_id, offset = curr // 8, m = more) / bytes([i] * g)
92            else:
93                pkt = ipv61 / sp.IPv6ExtHdrFragment(id = frag_id, offset = curr // 8, m = more) / bytes([i] * g)
94            pkts.append(pkt)
95            curr += gran
96            i += 1
97
98        sp.send(pkts, inter = 0.1)
99
100class TestFrag6_Overlap(VnetTestTemplate):
101    REQUIRED_MODULES = ["pf"]
102    TOPOLOGY = {
103        "vnet1": {"ifaces": ["if1"]},
104        "vnet2": {"ifaces": ["if1"]},
105        "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]},
106    }
107
108    def vnet2_handler(self, vnet):
109        ToolsHelper.print_output("/sbin/pfctl -e")
110        ToolsHelper.print_output("/sbin/pfctl -x loud")
111        ToolsHelper.pf_rules([
112            "scrub fragment reassemble",
113            "pass",
114        ])
115
116    @pytest.mark.require_user("root")
117    @pytest.mark.require_progs(["scapy"])
118    def test_overlap(self):
119        "Ensure we discard packets with overlapping fragments"
120
121        # Import in the correct vnet, so at to not confuse Scapy
122        import scapy.all as sp
123
124        packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \
125            / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 90))
126        frags = sp.fragment6(packet, 128)
127        assert len(frags) == 3
128
129        f = frags[0].getlayer(sp.IPv6ExtHdrFragment)
130        # Fragment with overlap
131        overlap = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \
132            / sp.IPv6ExtHdrFragment(offset = 4, m = 1, id = f.id, nh = f.nh) \
133            / sp.raw(bytes.fromhex('f00f') * 4)
134        frags = [ frags[0], frags[1], overlap, frags[2] ]
135
136        # Delay the send so the sniffer is running when we transmit.
137        s = DelayedSend(frags)
138
139        packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name,
140            timeout=3)
141        for p in packets:
142            p.show()
143            assert not p.getlayer(sp.ICMPv6EchoReply)
144