1# 2# SPDX-License-Identifier: BSD-2-Clause 3# 4# Copyright (c) 2023 Rubicon Communications, LLC (Netgate) 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions 8# are met: 9# 1. Redistributions of source code must retain the above copyright 10# notice, this list of conditions and the following disclaimer. 11# 2. Redistributions in binary form must reproduce the above copyright 12# notice, this list of conditions and the following disclaimer in the 13# documentation and/or other materials provided with the distribution. 14# 15# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25# SUCH DAMAGE. 26# 27import pytest 28from atf_python.sys.net.tools import ToolsHelper 29from atf_python.sys.net.vnet import VnetTestTemplate 30import os 31import socket 32import struct 33import sys 34import logging 35logging.getLogger("scapy").setLevel(logging.CRITICAL) 36curdir = os.path.dirname(os.path.realpath(__file__)) 37netpfil_common = curdir + "/../netpfil/common" 38sys.path.append(netpfil_common) 39from sniffer import Sniffer 40 41sc = None 42sp = None 43 44def check_igmpv3(args, pkt): 45 igmp = pkt.getlayer(sc.igmpv3.IGMPv3) 46 if igmp is None: 47 return False 48 49 igmpmr = pkt.getlayer(sc.igmpv3.IGMPv3mr) 50 if igmpmr is None: 51 return False 52 53 for r in igmpmr.records: 54 if r.maddr != args["group"]: 55 return False 56 if args["type"] == "join": 57 if r.rtype != 4: 58 return False 59 elif args["type"] == "leave": 60 if r.rtype != 3: 61 return False 62 r.show() 63 64 return True 65 66class TestIGMP(VnetTestTemplate): 67 REQUIRED_MODULES = [] 68 TOPOLOGY = { 69 "vnet1": { "ifaces": [ "if1" ] }, 70 "if1": { "prefixes4": [ ("192.0.2.1/24", "192.0.2.2/24" ) ] }, 71 } 72 73 def setup_method(self, method): 74 global sc 75 if sc is None: 76 import scapy.contrib as _sc 77 import scapy.contrib.igmp 78 import scapy.contrib.igmpv3 79 import scapy.all as _sp 80 sc = _sc 81 sp = _sp 82 super().setup_method(method) 83 84 def test_igmp3_join_leave(self): 85 "Test that we send the expected join/leave IGMPv2 messages" 86 87 if1 = self.vnet.iface_alias_map["if1"] 88 89 # Start a background sniff 90 expected_pkt = { "type": "join", "group": "230.0.0.1" } 91 sniffer = Sniffer(expected_pkt, check_igmpv3, if1.name, timeout=10) 92 93 # Now join a multicast group, and see if we're getting the igmp packet we expect 94 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 95 mreq = struct.pack("4sl", socket.inet_aton('230.0.0.1'), socket.INADDR_ANY) 96 s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) 97 98 # Wait for the sniffer to see the join packet 99 sniffer.join() 100 assert(sniffer.correctPackets > 0) 101 102 # Now leave, check for the packet 103 expected_pkt = { "type": "leave", "group": "230.0.0.1" } 104 sniffer = Sniffer(expected_pkt, check_igmpv3, if1.name) 105 106 s.close() 107 sniffer.join() 108 assert(sniffer.correctPackets > 0) 109