1#!/usr/bin/env python 2# 3# SPDX-License-Identifier: BSD-2-Clause 4# 5# Copyright (c) 2017 Kristof Provost <kp@FreeBSD.org> 6# 7# Redistribution and use in source and binary forms, with or without 8# modification, are permitted provided that the following conditions 9# are met: 10# 1. Redistributions of source code must retain the above copyright 11# notice, this list of conditions and the following disclaimer. 12# 2. Redistributions in binary form must reproduce the above copyright 13# notice, this list of conditions and the following disclaimer in the 14# documentation and/or other materials provided with the distribution. 15# 16# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26# SUCH DAMAGE. 27# 28 29import argparse 30import scapy.all as sp 31import socket 32import sys 33from sniffer import Sniffer 34 35PAYLOAD_MAGIC = bytes.fromhex('42c0ffee') 36 37def check_ping_request(args, packet): 38 if args.ip6: 39 return check_ping6_request(args, packet) 40 else: 41 return check_ping4_request(args, packet) 42 43def check_ping4_request(args, packet): 44 """ 45 Verify that the packet matches what we'd have sent 46 """ 47 dst_ip = args.to[0] 48 49 ip = packet.getlayer(sp.IP) 50 if not ip: 51 return False 52 if ip.dst != dst_ip: 53 return False 54 55 icmp = packet.getlayer(sp.ICMP) 56 if not icmp: 57 return False 58 if sp.icmptypes[icmp.type] != 'echo-request': 59 return False 60 61 raw = packet.getlayer(sp.Raw) 62 if not raw: 63 return False 64 if raw.load != PAYLOAD_MAGIC: 65 return False 66 67 # Wait to check expectations until we've established this is the packet we 68 # sent. 69 if args.expect_tos: 70 if ip.tos != int(args.expect_tos[0]): 71 print("Unexpected ToS value %d, expected %d" \ 72 % (ip.tos, int(args.expect_tos[0]))) 73 return False 74 75 return True 76 77def check_ping6_request(args, packet): 78 """ 79 Verify that the packet matches what we'd have sent 80 """ 81 dst_ip = args.to[0] 82 83 ip = packet.getlayer(sp.IPv6) 84 if not ip: 85 return False 86 if ip.dst != dst_ip: 87 return False 88 89 icmp = packet.getlayer(sp.ICMPv6EchoRequest) 90 if not icmp: 91 return False 92 if icmp.data != PAYLOAD_MAGIC: 93 return False 94 95 return True 96 97def ping(send_if, dst_ip, args): 98 ether = sp.Ether() 99 ip = sp.IP(dst=dst_ip) 100 icmp = sp.ICMP(type='echo-request') 101 raw = sp.raw(PAYLOAD_MAGIC) 102 103 if args.send_tos: 104 ip.tos = int(args.send_tos[0]) 105 106 req = ether / ip / icmp / raw 107 sp.sendp(req, iface=send_if, verbose=False) 108 109def ping6(send_if, dst_ip, args): 110 ether = sp.Ether() 111 ip6 = sp.IPv6(dst=dst_ip) 112 icmp = sp.ICMPv6EchoRequest(data=sp.raw(PAYLOAD_MAGIC)) 113 114 req = ether / ip6 / icmp 115 sp.sendp(req, iface=send_if, verbose=False) 116 117def check_tcpsyn(args, packet): 118 dst_ip = args.to[0] 119 120 ip = packet.getlayer(sp.IP) 121 if not ip: 122 return False 123 if ip.dst != dst_ip: 124 return False 125 126 tcp = packet.getlayer(sp.TCP) 127 if not tcp: 128 return False 129 130 # Verify IP checksum 131 chksum = ip.chksum 132 ip.chksum = None 133 new_chksum = sp.IP(sp.raw(ip)).chksum 134 if chksum != new_chksum: 135 print("Expected IP checksum %x but found %x\n" % (new_cshkum, chksum)) 136 return False 137 138 # Verify TCP checksum 139 chksum = tcp.chksum 140 packet_raw = sp.raw(packet) 141 tcp.chksum = None 142 newpacket = sp.Ether(sp.raw(packet[sp.Ether])) 143 new_chksum = newpacket[sp.TCP].chksum 144 if chksum != new_chksum: 145 print("Expected TCP checksum %x but found %x\n" % (new_chksum, chksum)) 146 return False 147 148 return True 149 150def tcpsyn(send_if, dst_ip, args): 151 opts=[('Timestamp', (1, 1)), ('MSS', 1280)] 152 153 if args.tcpopt_unaligned: 154 opts = [('NOP', 0 )] + opts 155 156 ether = sp.Ether() 157 ip = sp.IP(dst=dst_ip) 158 tcp = sp.TCP(dport=666, flags='S', options=opts) 159 160 req = ether / ip / tcp 161 sp.sendp(req, iface=send_if, verbose=False) 162 163 164def main(): 165 parser = argparse.ArgumentParser("pft_ping.py", 166 description="Ping test tool") 167 parser.add_argument('--sendif', nargs=1, 168 required=True, 169 help='The interface through which the packet(s) will be sent') 170 parser.add_argument('--recvif', nargs=1, 171 help='The interface on which to expect the ICMP echo response') 172 parser.add_argument('--ip6', action='store_true', 173 help='Use IPv6') 174 parser.add_argument('--to', nargs=1, 175 required=True, 176 help='The destination IP address for the ICMP echo request') 177 178 # TCP options 179 parser.add_argument('--tcpsyn', action='store_true', 180 help='Send a TCP SYN packet') 181 parser.add_argument('--tcpopt_unaligned', action='store_true', 182 help='Include unaligned TCP options') 183 184 # Packet settings 185 parser.add_argument('--send-tos', nargs=1, 186 help='Set the ToS value for the transmitted packet') 187 188 # Expectations 189 parser.add_argument('--expect-tos', nargs=1, 190 help='The expected ToS value in the received packet') 191 192 args = parser.parse_args() 193 194 # We may not have a default route. Tell scapy where to start looking for routes 195 sp.conf.iface6 = args.sendif[0] 196 197 sniffer = None 198 if not args.recvif is None: 199 checkfn=check_ping_request 200 if args.tcpsyn: 201 checkfn=check_tcpsyn 202 203 sniffer = Sniffer(args, checkfn) 204 205 if args.tcpsyn: 206 tcpsyn(args.sendif[0], args.to[0], args) 207 else: 208 if args.ip6: 209 ping6(args.sendif[0], args.to[0], args) 210 else: 211 ping(args.sendif[0], args.to[0], args) 212 213 if sniffer: 214 sniffer.join() 215 216 if sniffer.foundCorrectPacket: 217 sys.exit(0) 218 else: 219 sys.exit(1) 220 221if __name__ == '__main__': 222 main() 223