1# 2# SPDX-License-Identifier: BSD-2-Clause 3# 4# Copyright (c) 2023 Rubicon Communications, LLC (Netgate) 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions 8# are met: 9# 1. Redistributions of source code must retain the above copyright 10# notice, this list of conditions and the following disclaimer. 11# 2. Redistributions in binary form must reproduce the above copyright 12# notice, this list of conditions and the following disclaimer in the 13# documentation and/or other materials provided with the distribution. 14# 15# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25# SUCH DAMAGE. 26# 27import pytest 28from atf_python.sys.net.tools import ToolsHelper 29from atf_python.sys.net.vnet import VnetTestTemplate 30import os 31import socket 32import struct 33import sys 34import logging 35logging.getLogger("scapy").setLevel(logging.CRITICAL) 36curdir = os.path.dirname(os.path.realpath(__file__)) 37netpfil_common = curdir + "/../netpfil/common" 38sys.path.append(netpfil_common) 39 40sc = None 41sp = None 42 43def check_igmpv3(args, pkt): 44 igmp = pkt.getlayer(sc.igmpv3.IGMPv3) 45 if igmp is None: 46 return False 47 48 igmpmr = pkt.getlayer(sc.igmpv3.IGMPv3mr) 49 if igmpmr is None: 50 return False 51 52 for r in igmpmr.records: 53 if r.maddr != args["group"]: 54 return False 55 if args["type"] == "join": 56 if r.rtype != 4: 57 return False 58 elif args["type"] == "leave": 59 if r.rtype != 3: 60 return False 61 r.show() 62 63 return True 64 65def check_igmpv2(args, pkt): 66 pkt.show() 67 68 igmp = pkt.getlayer(sc.igmp.IGMP) 69 if igmp is None: 70 return False 71 72 if igmp.gaddr != args["group"]: 73 return False 74 75 if args["type"] == "join": 76 if igmp.type != 0x16: 77 return False 78 if args["type"] == "leave": 79 if igmp.type != 0x17: 80 return False 81 82 return True 83 84class TestIGMP(VnetTestTemplate): 85 REQUIRED_MODULES = [] 86 TOPOLOGY = { 87 "vnet1": { "ifaces": [ "if1" ] }, 88 "if1": { "prefixes4": [ ("192.0.2.1/24", "192.0.2.2/24" ) ] }, 89 } 90 91 def setup_method(self, method): 92 global sc 93 if sc is None: 94 import scapy.contrib as _sc 95 import scapy.contrib.igmp 96 import scapy.contrib.igmpv3 97 import scapy.all as _sp 98 sc = _sc 99 sp = _sp 100 super().setup_method(method) 101 102 @pytest.mark.require_progs(["scapy"]) 103 def test_igmp3_join_leave(self): 104 "Test that we send the expected join/leave IGMPv3 messages" 105 106 if1 = self.vnet.iface_alias_map["if1"] 107 108 # Start a background sniff 109 from sniffer import Sniffer 110 expected_pkt = { "type": "join", "group": "230.0.0.1" } 111 sniffer = Sniffer(expected_pkt, check_igmpv3, if1.name, timeout=10) 112 113 # Now join a multicast group, and see if we're getting the igmp packet we expect 114 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 115 mreq = struct.pack("4sl", socket.inet_aton('230.0.0.1'), socket.INADDR_ANY) 116 s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) 117 118 # Wait for the sniffer to see the join packet 119 sniffer.join() 120 assert(sniffer.correctPackets > 0) 121 122 # Now leave, check for the packet 123 expected_pkt = { "type": "leave", "group": "230.0.0.1" } 124 sniffer = Sniffer(expected_pkt, check_igmpv3, if1.name) 125 126 s.close() 127 sniffer.join() 128 assert(sniffer.correctPackets > 0) 129 130 @pytest.mark.require_progs(["scapy"]) 131 def test_igmp2_join_leave(self): 132 "Test that we send the expected join/leave IGMPv2 messages" 133 ToolsHelper.print_output("/sbin/sysctl net.inet.igmp.default_version=2") 134 135 if1 = self.vnet.iface_alias_map["if1"] 136 137 # Start a background sniff 138 from sniffer import Sniffer 139 expected_pkt = { "type": "join", "group": "230.0.0.1" } 140 sniffer = Sniffer(expected_pkt, check_igmpv2, if1.name, timeout=10) 141 142 # Now join a multicast group, and see if we're getting the igmp packet we expect 143 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 144 mreq = struct.pack("4sl", socket.inet_aton('230.0.0.1'), socket.INADDR_ANY) 145 s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) 146 147 # Wait for the sniffer to see the join packet 148 sniffer.join() 149 assert(sniffer.correctPackets > 0) 150 151 # Now leave, check for the packet 152 expected_pkt = { "type": "leave", "group": "230.0.0.1" } 153 sniffer = Sniffer(expected_pkt, check_igmpv2, if1.name) 154 155 s.close() 156 sniffer.join() 157 assert(sniffer.correctPackets > 0) 158