1# 2# SPDX-License-Identifier: BSD-2-Clause 3# 4# Copyright (c) 2025 Rubicon Communications, LLC (Netgate) 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions 8# are met: 9# 1. Redistributions of source code must retain the above copyright 10# notice, this list of conditions and the following disclaimer. 11# 2. Redistributions in binary form must reproduce the above copyright 12# notice, this list of conditions and the following disclaimer in the 13# documentation and/or other materials provided with the distribution. 14# 15# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25# SUCH DAMAGE. 26 27import pytest 28from utils import DelayedSend 29from atf_python.sys.net.tools import ToolsHelper 30from atf_python.sys.net.vnet import VnetTestTemplate 31 32class TestMLD(VnetTestTemplate): 33 REQUIRED_MODULES = [ "pf" ] 34 TOPOLOGY = { 35 "vnet1": {"ifaces": ["if1"], "opts": ["allow.read_msgbuf"]}, 36 "vnet2": {"ifaces": ["if1"]}, 37 "if1": {"prefixes6": [("2001:db8::2/64", "2001:db8::1/64")]}, 38 } 39 40 def vnet2_handler(self, vnet): 41 ifname = vnet.iface_alias_map["if1"].name 42 ToolsHelper.print_output("/sbin/pfctl -e") 43 ToolsHelper.pf_rules([ 44 "pass", 45 ]) 46 ToolsHelper.print_output("/sbin/pfctl -x loud") 47 48 def find_mld_reply(self, pkt, ifname): 49 pkt.show() 50 s = DelayedSend(pkt, ifname) 51 52 found = False 53 packets = self.sp.sniff(iface=ifname, timeout=5) 54 for r in packets: 55 r.show() 56 mld = r.getlayer(self.sp.ICMPv6MLReport2) 57 if not mld: 58 continue 59 mld.show() 60 found = True 61 return found 62 63 @pytest.mark.require_user("root") 64 @pytest.mark.require_progs(["scapy"]) 65 def test_router_alert(self): 66 """Verify that we allow MLD packets with router alert extension header""" 67 ifname = self.vnet.iface_alias_map["if1"].name 68 ToolsHelper.print_output("/sbin/ifconfig") 69 70 # Import in the correct vnet, so at to not confuse Scapy 71 import scapy.all as sp 72 import scapy.contrib as sc 73 import scapy.contrib.igmp 74 self.sp = sp 75 self.sc = sc 76 77 # MLD packets with an incorrect hop limit get dropped. 78 pkt = sp.Ether() \ 79 / sp.IPv6(src="fe80::1%%%s" % ifname, dst="ff02::1", hlim=2) \ 80 / sp.IPv6ExtHdrHopByHop(options=[ \ 81 sp.RouterAlert(value=0) \ 82 ]) \ 83 / sp.ICMPv6MLQuery2() 84 # We can't reliably test this by checking for a reply, because 85 # the other jail may just send a spontaneous MLD reply. 86 self.find_mld_reply(pkt, ifname) 87 88 # Check if we logged dropping the MLD paacket 89 dmesg = ToolsHelper.get_output("/sbin/dmesg") 90 assert dmesg.find("Invalid MLD") != -1 91