1# 2# SPDX-License-Identifier: BSD-2-Clause 3# 4# Copyright (c) 2023 Rubicon Communications, LLC (Netgate) 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions 8# are met: 9# 1. Redistributions of source code must retain the above copyright 10# notice, this list of conditions and the following disclaimer. 11# 2. Redistributions in binary form must reproduce the above copyright 12# notice, this list of conditions and the following disclaimer in the 13# documentation and/or other materials provided with the distribution. 14# 15# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25# SUCH DAMAGE. 26# 27import pytest 28from atf_python.sys.net.tools import ToolsHelper 29from atf_python.sys.net.vnet import VnetTestTemplate 30import os 31import socket 32import struct 33import sys 34import logging 35logging.getLogger("scapy").setLevel(logging.CRITICAL) 36curdir = os.path.dirname(os.path.realpath(__file__)) 37netpfil_common = curdir + "/../netpfil/common" 38sys.path.append(netpfil_common) 39 40sc = None 41sp = None 42 43def check_igmpv3(args, pkt): 44 igmp = pkt.getlayer(sc.igmpv3.IGMPv3) 45 if igmp is None: 46 return False 47 48 igmpmr = pkt.getlayer(sc.igmpv3.IGMPv3mr) 49 if igmpmr is None: 50 return False 51 52 for r in igmpmr.records: 53 if r.maddr != args["group"]: 54 return False 55 if args["type"] == "join": 56 if r.rtype != 4: 57 return False 58 elif args["type"] == "leave": 59 if r.rtype != 3: 60 return False 61 r.show() 62 63 return True 64 65class TestIGMP(VnetTestTemplate): 66 REQUIRED_MODULES = [] 67 TOPOLOGY = { 68 "vnet1": { "ifaces": [ "if1" ] }, 69 "if1": { "prefixes4": [ ("192.0.2.1/24", "192.0.2.2/24" ) ] }, 70 } 71 72 def setup_method(self, method): 73 global sc 74 if sc is None: 75 import scapy.contrib as _sc 76 import scapy.contrib.igmp 77 import scapy.contrib.igmpv3 78 import scapy.all as _sp 79 sc = _sc 80 sp = _sp 81 super().setup_method(method) 82 83 @pytest.mark.require_progs(["scapy"]) 84 def test_igmp3_join_leave(self): 85 "Test that we send the expected join/leave IGMPv2 messages" 86 87 if1 = self.vnet.iface_alias_map["if1"] 88 89 # Start a background sniff 90 from sniffer import Sniffer 91 expected_pkt = { "type": "join", "group": "230.0.0.1" } 92 sniffer = Sniffer(expected_pkt, check_igmpv3, if1.name, timeout=10) 93 94 # Now join a multicast group, and see if we're getting the igmp packet we expect 95 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 96 mreq = struct.pack("4sl", socket.inet_aton('230.0.0.1'), socket.INADDR_ANY) 97 s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) 98 99 # Wait for the sniffer to see the join packet 100 sniffer.join() 101 assert(sniffer.correctPackets > 0) 102 103 # Now leave, check for the packet 104 expected_pkt = { "type": "leave", "group": "230.0.0.1" } 105 sniffer = Sniffer(expected_pkt, check_igmpv3, if1.name) 106 107 s.close() 108 sniffer.join() 109 assert(sniffer.correctPackets > 0) 110