1#!/usr/bin/env python3 2# 3# SPDX-License-Identifier: BSD-2-Clause 4# 5# Copyright (c) 2021 Rubicon Communications, LLC (Netgate) 6# 7# Redistribution and use in source and binary forms, with or without 8# modification, are permitted provided that the following conditions 9# are met: 10# 1. Redistributions of source code must retain the above copyright 11# notice, this list of conditions and the following disclaimer. 12# 2. Redistributions in binary form must reproduce the above copyright 13# notice, this list of conditions and the following disclaimer in the 14# documentation and/or other materials provided with the distribution. 15# 16# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26# SUCH DAMAGE. 27# 28 29import argparse 30import logging 31logging.getLogger("scapy").setLevel(logging.CRITICAL) 32import random 33import scapy.all as sp 34import socket 35import sys 36from sniffer import Sniffer 37 38PAYLOAD_MAGIC = bytes.fromhex('42c0ffee') 39 40def ping(send_if, dst_ip, args): 41 ether = sp.Ether() 42 ip = sp.IP(dst=dst_ip, src=args.fromaddr[0]) 43 icmp = sp.ICMP(type='echo-request') 44 raw = sp.raw(PAYLOAD_MAGIC * 250) # We want 1000 bytes payload, -ish 45 46 ip.flags = 2 # Don't fragment 47 icmp.seq = random.randint(0, 65535) 48 args.icmp_seq = icmp.seq 49 50 req = ether / ip / icmp / raw 51 sp.sendp(req, iface=send_if, verbose=False) 52 53def check_icmp_too_big(args, packet): 54 """ 55 Verify that this is an ICMP packet too big error, and that the IP addresses 56 in the payload packet match expectations. 57 """ 58 icmp = packet.getlayer(sp.ICMP) 59 if not icmp: 60 return False 61 62 if not icmp.type == 3: 63 return False 64 ip = packet.getlayer(sp.IPerror) 65 if not ip: 66 return False 67 68 if ip.src != args.fromaddr[0]: 69 print("Incorrect src addr %s" % ip.src) 70 return False 71 if ip.dst != args.to[0]: 72 print("Incorrect dst addr %s" % ip.dst) 73 return False 74 75 icmp2 = packet.getlayer(sp.ICMPerror) 76 if not icmp2: 77 print("IPerror doesn't contain ICMP") 78 return False 79 if icmp2.seq != args.icmp_seq: 80 print("Incorrect icmp seq %d != %d" % (icmp2.seq, args.icmp_seq)) 81 return False 82 return True 83 84def main(): 85 parser = argparse.ArgumentParser("pft_icmp_check.py", 86 description="ICMP error validation tool") 87 parser.add_argument('--to', nargs=1, required=True, 88 help='The destination IP address') 89 parser.add_argument('--fromaddr', nargs=1, required=True, 90 help='The source IP address') 91 parser.add_argument('--sendif', nargs=1, required=True, 92 help='The interface through which the packet(s) will be sent') 93 parser.add_argument('--recvif', nargs=1, 94 help='The interface on which to expect the ICMP error') 95 96 args = parser.parse_args() 97 sniffer = None 98 if not args.recvif is None: 99 sniffer = Sniffer(args, check_icmp_too_big, args.recvif[0]) 100 101 ping(args.sendif[0], args.to[0], args) 102 103 if sniffer: 104 sniffer.join() 105 106 if sniffer.correctPackets: 107 sys.exit(0) 108 else: 109 sys.exit(1) 110 111if __name__ == '__main__': 112 main() 113