xref: /freebsd/tests/sys/netpfil/pf/frag6.py (revision f96f838114a223c0399d23240555099586fbf97a)
1import pytest
2import logging
3import random
4logging.getLogger("scapy").setLevel(logging.CRITICAL)
5from utils import DelayedSend
6from atf_python.sys.net.tools import ToolsHelper
7from atf_python.sys.net.vnet import VnetTestTemplate
8
9class TestFrag6(VnetTestTemplate):
10    REQUIRED_MODULES = ["pf", "dummymbuf"]
11    TOPOLOGY = {
12        "vnet1": {"ifaces": ["if1"]},
13        "vnet2": {"ifaces": ["if1"]},
14        "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]},
15    }
16
17    def vnet2_handler(self, vnet):
18        ifname = vnet.iface_alias_map["if1"].name
19        ToolsHelper.print_output("/sbin/pfctl -e")
20        ToolsHelper.pf_rules([
21            "scrub fragment reassemble min-ttl 10",
22            "pass",
23            "block in inet6 proto icmp6 icmp6-type echoreq",
24        ])
25        ToolsHelper.print_output("/sbin/pfilctl link -i dummymbuf:inet6 inet6")
26        ToolsHelper.print_output("/sbin/sysctl net.dummymbuf.rules=\"inet6 in %s enlarge 3000;\"" % ifname)
27
28    def check_ping_reply(self, packet):
29        print(packet)
30        return False
31
32    @pytest.mark.require_user("root")
33    @pytest.mark.require_progs(["scapy"])
34    def test_dup_frag_hdr(self):
35        "Test packets with duplicate fragment headers"
36        srv_vnet = self.vnet_map["vnet2"]
37
38        # Import in the correct vnet, so at to not confuse Scapy
39        import scapy.all as sp
40
41        packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \
42            / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \
43            / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \
44            / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 128))
45
46        # Delay the send so the sniffer is running when we transmit.
47        s = DelayedSend(packet)
48
49        packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name,
50            timeout=3)
51        for p in packets:
52            assert not p.getlayer(sp.ICMPv6EchoReply)
53
54    @pytest.mark.require_user("root")
55    @pytest.mark.require_progs(["scapy"])
56    def test_overlong(self):
57        "Test overly long fragmented packet"
58
59        # Import in the correct vnet, so at to not confuse Scapy
60        import scapy.all as sp
61
62        curr = 0
63        pkts = []
64
65        frag_id = random.randint(0,0xffffffff)
66        gran = 1200
67
68        i = 0
69        while curr <= 65535:
70            ipv61 = sp.IPv6(src="2001:db8::1", dst="2001:db8::2")
71            more = True
72            g = gran
73            if curr + gran > 65535:
74                more = False
75                g = 65530 - curr
76            if i == 0:
77                pkt = ipv61 / sp.IPv6ExtHdrHopByHop(options=[sp.PadN(optlen=2), sp.Pad1()]) / \
78                    sp.IPv6ExtHdrFragment(id = frag_id, offset = curr // 8, m = more) / bytes([i] * g)
79            else:
80                pkt = ipv61 / sp.IPv6ExtHdrFragment(id = frag_id, offset = curr // 8, m = more) / bytes([i] * g)
81            pkts.append(pkt)
82            curr += gran
83            i += 1
84
85        sp.send(pkts, inter = 0.1)
86
87class TestFrag6HopHyHop(VnetTestTemplate):
88    REQUIRED_MODULES = ["pf"]
89    TOPOLOGY = {
90        "vnet1": {"ifaces": ["if1", "if2"]},
91        "vnet2": {"ifaces": ["if1", "if2"]},
92        "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]},
93        "if2": {"prefixes6": [("2001:db8:666::1/64", "2001:db8:1::2/64")]},
94    }
95
96    def vnet2_handler(self, vnet):
97        ifname = vnet.iface_alias_map["if1"].name
98        ToolsHelper.print_output("/sbin/sysctl net.inet6.ip6.forwarding=1")
99        ToolsHelper.print_output("/usr/sbin/ndp -s 2001:db8:1::1 00:01:02:03:04:05")
100        ToolsHelper.print_output("/sbin/pfctl -e")
101        ToolsHelper.print_output("/sbin/pfctl -x loud")
102        ToolsHelper.pf_rules([
103            "scrub fragment reassemble min-ttl 10",
104            "pass allow-opts",
105        ])
106
107    @pytest.mark.require_user("root")
108    @pytest.mark.require_progs(["scapy"])
109    def test_hop_by_hop(self):
110        "Verify that we reject non-first hop-by-hop headers"
111        if1 = self.vnet.iface_alias_map["if1"].name
112        if2 = self.vnet.iface_alias_map["if2"].name
113        ToolsHelper.print_output("/sbin/route add -6 default 2001:db8::2")
114        ToolsHelper.print_output("/sbin/ping6 -c 1 2001:db8:1::2")
115
116        # Import in the correct vnet, so at to not confuse Scapy
117        import scapy.all as sp
118
119        # A hop-by-hop header is accepted if it's the first header
120        pkt = sp.IPv6(src="2001:db8::1", dst="2001:db8:1::1") \
121            / sp.IPv6ExtHdrHopByHop() \
122            / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f0') * 30))
123        pkt.show()
124
125        # Delay the send so the sniffer is running when we transmit.
126        s = DelayedSend(pkt)
127
128        replies = sp.sniff(iface=if2, timeout=3)
129        found = False
130        for p in replies:
131            p.show()
132            ip6 = p.getlayer(sp.IPv6)
133            hbh = p.getlayer(sp.IPv6ExtHdrHopByHop)
134            icmp6 = p.getlayer(sp.ICMPv6EchoRequest)
135
136            if not ip6 or not icmp6:
137                continue
138            assert ip6.src == "2001:db8::1"
139            assert ip6.dst == "2001:db8:1::1"
140            assert hbh
141            assert icmp6
142            found = True
143        assert found
144
145        # A hop-by-hop header causes the packet to be dropped if it's not the
146        # first extension header
147        pkt = sp.IPv6(src="2001:db8::1", dst="2001:db8:1::1") \
148            / sp.IPv6ExtHdrFragment(offset=0, m=0) \
149            / sp.IPv6ExtHdrHopByHop() \
150            / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f0') * 30))
151        pkt2 = sp.IPv6(src="2001:db8::1", dst="2001:db8:1::1") \
152            / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f0') * 30))
153
154        # Delay the send so the sniffer is running when we transmit.
155        ToolsHelper.print_output("/sbin/ping6 -c 1 2001:db8:1::2")
156
157        s = DelayedSend([ pkt2, pkt ])
158        replies = sp.sniff(iface=if2, timeout=10)
159        found = False
160        for p in replies:
161            # Expect to find the packet without the hop-by-hop header, not the
162            # one with
163            p.show()
164            ip6 = p.getlayer(sp.IPv6)
165            hbh = p.getlayer(sp.IPv6ExtHdrHopByHop)
166            icmp6 = p.getlayer(sp.ICMPv6EchoRequest)
167
168            if not ip6 or not icmp6:
169                continue
170            assert ip6.src == "2001:db8::1"
171            assert ip6.dst == "2001:db8:1::1"
172            assert not hbh
173            assert icmp6
174            found = True
175        assert found
176
177class TestFrag6_Overlap(VnetTestTemplate):
178    REQUIRED_MODULES = ["pf"]
179    TOPOLOGY = {
180        "vnet1": {"ifaces": ["if1"]},
181        "vnet2": {"ifaces": ["if1"]},
182        "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]},
183    }
184
185    def vnet2_handler(self, vnet):
186        ToolsHelper.print_output("/sbin/pfctl -e")
187        ToolsHelper.print_output("/sbin/pfctl -x loud")
188        ToolsHelper.pf_rules([
189            "scrub fragment reassemble",
190            "pass",
191        ])
192
193    @pytest.mark.require_user("root")
194    @pytest.mark.require_progs(["scapy"])
195    def test_overlap(self):
196        "Ensure we discard packets with overlapping fragments"
197
198        # Import in the correct vnet, so at to not confuse Scapy
199        import scapy.all as sp
200
201        packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \
202            / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 90))
203        frags = sp.fragment6(packet, 128)
204        assert len(frags) == 3
205
206        f = frags[0].getlayer(sp.IPv6ExtHdrFragment)
207        # Fragment with overlap
208        overlap = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \
209            / sp.IPv6ExtHdrFragment(offset = 4, m = 1, id = f.id, nh = f.nh) \
210            / sp.raw(bytes.fromhex('f00f') * 4)
211        frags = [ frags[0], frags[1], overlap, frags[2] ]
212
213        # Delay the send so the sniffer is running when we transmit.
214        s = DelayedSend(frags)
215
216        packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name,
217            timeout=3)
218        for p in packets:
219            p.show()
220            assert not p.getlayer(sp.ICMPv6EchoReply)
221
222class TestFrag6_RouteTo(VnetTestTemplate):
223    REQUIRED_MODULES = ["pf"]
224    TOPOLOGY = {
225        "vnet1": {"ifaces": ["if1"]},
226        "vnet2": {"ifaces": ["if1", "if2"]},
227        "vnet3": {"ifaces": ["if2"]},
228        "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]},
229        "if2": {"prefixes6": [("2001:db8:1::1/64", "2001:db8:1::2/64")]},
230    }
231
232    def vnet2_handler(self, vnet):
233        if2name = vnet.iface_alias_map["if2"].name
234        ToolsHelper.print_output("/sbin/pfctl -e")
235        ToolsHelper.print_output("/sbin/pfctl -x loud")
236        ToolsHelper.pf_rules([
237            "scrub fragment reassemble",
238            "pass in route-to (%s 2001:db8:1::2) from 2001:db8::1 to 2001:db8:666::1" % if2name,
239        ])
240
241        ToolsHelper.print_output("/sbin/ifconfig %s mtu 1300" % if2name)
242        ToolsHelper.print_output("/sbin/sysctl net.inet6.ip6.forwarding=1")
243
244    def vnet3_handler(self, vnet):
245        pass
246
247    @pytest.mark.require_user("root")
248    @pytest.mark.require_progs(["scapy"])
249    def test_too_big(self):
250        ToolsHelper.print_output("/sbin/route add -6 default 2001:db8::2")
251
252        # Import in the correct vnet, so at to not confuse Scapy
253        import scapy.all as sp
254
255        pkt = sp.IPv6(dst="2001:db8:666::1") \
256            / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f0') * 3000))
257        frags = sp.fragment6(pkt, 1320)
258
259        reply = sp.sr1(frags, timeout=3)
260        if reply:
261            reply.show()
262
263        assert reply
264
265        ip6 = reply.getlayer(sp.IPv6)
266        icmp6 = reply.getlayer(sp.ICMPv6PacketTooBig)
267        err_ip6 = reply.getlayer(sp.IPerror6)
268
269        assert ip6
270        assert ip6.src == "2001:db8::2"
271        assert ip6.dst == "2001:db8::1"
272        assert icmp6
273        assert icmp6.mtu == 1300
274        assert err_ip6
275        assert err_ip6.src == "2001:db8::1"
276        assert err_ip6.dst == "2001:db8:666::1"
277