1# 2# SPDX-License-Identifier: BSD-2-Clause 3# 4# Copyright (c) 2025 Rubicon Communications, LLC (Netgate) 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions 8# are met: 9# 1. Redistributions of source code must retain the above copyright 10# notice, this list of conditions and the following disclaimer. 11# 2. Redistributions in binary form must reproduce the above copyright 12# notice, this list of conditions and the following disclaimer in the 13# documentation and/or other materials provided with the distribution. 14# 15# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25# SUCH DAMAGE. 26 27import pytest 28from utils import DelayedSend 29from atf_python.sys.net.tools import ToolsHelper 30from atf_python.sys.net.vnet import VnetTestTemplate 31 32class TestIGMP(VnetTestTemplate): 33 REQUIRED_MODULES = [ "pf" ] 34 TOPOLOGY = { 35 "vnet1": {"ifaces": ["if1"]}, 36 "vnet2": {"ifaces": ["if1"]}, 37 "if1": {"prefixes4": [("192.0.2.2/24", "192.0.2.1/24")]}, 38 } 39 40 def vnet2_handler(self, vnet): 41 ifname = vnet.iface_alias_map["if1"].name 42 ToolsHelper.print_output("/sbin/pfctl -e") 43 ToolsHelper.pf_rules([ 44 "pass", 45 ]) 46 ToolsHelper.print_output("/sbin/pfctl -x loud") 47 ToolsHelper.print_output("echo \"j 230.0.0.1 %s\ns 3600\nq\" | /usr/sbin/mtest" % ifname) 48 49 def find_igmp_reply(self, pkt, ifname): 50 pkt.show() 51 s = DelayedSend(pkt) 52 53 found = False 54 packets = self.sp.sniff(iface=ifname, timeout=5) 55 for r in packets: 56 r.show() 57 igmp = r.getlayer(self.sc.igmp.IGMP) 58 if not igmp: 59 continue 60 igmp.show() 61 if not igmp.gaddr == "230.0.0.1": 62 continue 63 found = True 64 return found 65 66 @pytest.mark.require_user("root") 67 @pytest.mark.require_progs(["scapy"]) 68 def test_ip_opts(self): 69 """Verify that we allow IGMP packets with IP options""" 70 ifname = self.vnet.iface_alias_map["if1"].name 71 72 # Import in the correct vnet, so at to not confuse Scapy 73 import scapy.all as sp 74 import scapy.contrib as sc 75 import scapy.contrib.igmp 76 self.sp = sp 77 self.sc = sc 78 79 # We allow IGMP packets with the router alert option 80 pkt = sp.IP(dst="224.0.0.1%%%s" % ifname, ttl=1, 81 options=[sp.IPOption_Router_Alert()]) \ 82 / sc.igmp.IGMP(type=0x11, mrcode=1) 83 assert self.find_igmp_reply(pkt, ifname) 84 85 # But not with other options 86 pkt = sp.IP(dst="224.0.0.1%%%s" % ifname, ttl=1, 87 options=[sp.IPOption_NOP()]) \ 88 / sc.igmp.IGMP(type=0x11, mrcode=1) 89 assert not self.find_igmp_reply(pkt, ifname) 90 91 # Or with the wrong TTL 92 pkt = sp.IP(dst="224.0.0.1%%%s" % ifname, ttl=2, 93 options=[sp.IPOption_Router_Alert()]) \ 94 / sc.igmp.IGMP(type=0x11, mrcode=1) 95 assert not self.find_igmp_reply(pkt, ifname) 96 97 # Or with the wrong destination address 98 pkt = sp.IP(dst="224.0.0.2%%%s" % ifname, ttl=2, 99 options=[sp.IPOption_Router_Alert()]) \ 100 / sc.igmp.IGMP(type=0x11, mrcode=1) 101 assert not self.find_igmp_reply(pkt, ifname) 102