1#!/usr/bin/env python 2# 3# SPDX-License-Identifier: BSD-2-Clause 4# 5# Copyright (c) 2017 Kristof Provost <kp@FreeBSD.org> 6# 7# Redistribution and use in source and binary forms, with or without 8# modification, are permitted provided that the following conditions 9# are met: 10# 1. Redistributions of source code must retain the above copyright 11# notice, this list of conditions and the following disclaimer. 12# 2. Redistributions in binary form must reproduce the above copyright 13# notice, this list of conditions and the following disclaimer in the 14# documentation and/or other materials provided with the distribution. 15# 16# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26# SUCH DAMAGE. 27# 28 29import argparse 30import scapy.all as sp 31import socket 32import sys 33from sniffer import Sniffer 34 35PAYLOAD_MAGIC = bytes.fromhex('42c0ffee') 36 37dup_found = 0 38 39def check_dup(args, packet): 40 """ 41 Verify that this is an ICMP packet, and that we only see one 42 """ 43 global dup_found 44 45 icmp = packet.getlayer(sp.ICMP) 46 if not icmp: 47 return False 48 49 raw = packet.getlayer(sp.Raw) 50 if not raw: 51 return False 52 if raw.load != PAYLOAD_MAGIC: 53 return False 54 55 dup_found = dup_found + 1 56 return False 57 58def check_ping_request(args, packet): 59 if args.ip6: 60 return check_ping6_request(args, packet) 61 else: 62 return check_ping4_request(args, packet) 63 64def check_ping4_request(args, packet): 65 """ 66 Verify that the packet matches what we'd have sent 67 """ 68 dst_ip = args.to[0] 69 70 ip = packet.getlayer(sp.IP) 71 if not ip: 72 return False 73 if ip.dst != dst_ip: 74 return False 75 76 icmp = packet.getlayer(sp.ICMP) 77 if not icmp: 78 return False 79 if sp.icmptypes[icmp.type] != 'echo-request': 80 return False 81 82 raw = packet.getlayer(sp.Raw) 83 if not raw: 84 return False 85 if raw.load != PAYLOAD_MAGIC: 86 return False 87 88 # Wait to check expectations until we've established this is the packet we 89 # sent. 90 if args.expect_tos: 91 if ip.tos != int(args.expect_tos[0]): 92 print("Unexpected ToS value %d, expected %d" \ 93 % (ip.tos, int(args.expect_tos[0]))) 94 return False 95 96 return True 97 98def check_ping6_request(args, packet): 99 """ 100 Verify that the packet matches what we'd have sent 101 """ 102 dst_ip = args.to[0] 103 104 ip = packet.getlayer(sp.IPv6) 105 if not ip: 106 return False 107 if ip.dst != dst_ip: 108 return False 109 110 icmp = packet.getlayer(sp.ICMPv6EchoRequest) 111 if not icmp: 112 return False 113 if icmp.data != PAYLOAD_MAGIC: 114 return False 115 116 return True 117 118def ping(send_if, dst_ip, args): 119 ether = sp.Ether() 120 ip = sp.IP(dst=dst_ip) 121 icmp = sp.ICMP(type='echo-request') 122 raw = sp.raw(PAYLOAD_MAGIC) 123 124 if args.send_tos: 125 ip.tos = int(args.send_tos[0]) 126 127 req = ether / ip / icmp / raw 128 sp.sendp(req, iface=send_if, verbose=False) 129 130def ping6(send_if, dst_ip, args): 131 ether = sp.Ether() 132 ip6 = sp.IPv6(dst=dst_ip) 133 icmp = sp.ICMPv6EchoRequest(data=sp.raw(PAYLOAD_MAGIC)) 134 135 req = ether / ip6 / icmp 136 sp.sendp(req, iface=send_if, verbose=False) 137 138def check_tcpsyn(args, packet): 139 dst_ip = args.to[0] 140 141 ip = packet.getlayer(sp.IP) 142 if not ip: 143 return False 144 if ip.dst != dst_ip: 145 return False 146 147 tcp = packet.getlayer(sp.TCP) 148 if not tcp: 149 return False 150 151 # Verify IP checksum 152 chksum = ip.chksum 153 ip.chksum = None 154 new_chksum = sp.IP(sp.raw(ip)).chksum 155 if chksum != new_chksum: 156 print("Expected IP checksum %x but found %x\n" % (new_cshkum, chksum)) 157 return False 158 159 # Verify TCP checksum 160 chksum = tcp.chksum 161 packet_raw = sp.raw(packet) 162 tcp.chksum = None 163 newpacket = sp.Ether(sp.raw(packet[sp.Ether])) 164 new_chksum = newpacket[sp.TCP].chksum 165 if chksum != new_chksum: 166 print("Expected TCP checksum %x but found %x\n" % (new_chksum, chksum)) 167 return False 168 169 return True 170 171def tcpsyn(send_if, dst_ip, args): 172 opts=[('Timestamp', (1, 1)), ('MSS', 1280)] 173 174 if args.tcpopt_unaligned: 175 opts = [('NOP', 0 )] + opts 176 177 ether = sp.Ether() 178 ip = sp.IP(dst=dst_ip) 179 tcp = sp.TCP(dport=666, flags='S', options=opts) 180 181 req = ether / ip / tcp 182 sp.sendp(req, iface=send_if, verbose=False) 183 184 185def main(): 186 parser = argparse.ArgumentParser("pft_ping.py", 187 description="Ping test tool") 188 parser.add_argument('--sendif', nargs=1, 189 required=True, 190 help='The interface through which the packet(s) will be sent') 191 parser.add_argument('--recvif', nargs=1, 192 help='The interface on which to expect the ICMP echo response') 193 parser.add_argument('--checkdup', nargs=1, 194 help='The interface on which to expect the duplicated ICMP packets') 195 parser.add_argument('--ip6', action='store_true', 196 help='Use IPv6') 197 parser.add_argument('--to', nargs=1, 198 required=True, 199 help='The destination IP address for the ICMP echo request') 200 201 # TCP options 202 parser.add_argument('--tcpsyn', action='store_true', 203 help='Send a TCP SYN packet') 204 parser.add_argument('--tcpopt_unaligned', action='store_true', 205 help='Include unaligned TCP options') 206 207 # Packet settings 208 parser.add_argument('--send-tos', nargs=1, 209 help='Set the ToS value for the transmitted packet') 210 211 # Expectations 212 parser.add_argument('--expect-tos', nargs=1, 213 help='The expected ToS value in the received packet') 214 215 args = parser.parse_args() 216 217 # We may not have a default route. Tell scapy where to start looking for routes 218 sp.conf.iface6 = args.sendif[0] 219 220 sniffer = None 221 if not args.recvif is None: 222 checkfn=check_ping_request 223 if args.tcpsyn: 224 checkfn=check_tcpsyn 225 226 sniffer = Sniffer(args, checkfn) 227 228 dupsniffer = None 229 if args.checkdup is not None: 230 dupsniffer = Sniffer(args, check_dup, recvif=args.checkdup[0]) 231 232 if args.tcpsyn: 233 tcpsyn(args.sendif[0], args.to[0], args) 234 else: 235 if args.ip6: 236 ping6(args.sendif[0], args.to[0], args) 237 else: 238 ping(args.sendif[0], args.to[0], args) 239 240 if dupsniffer: 241 dupsniffer.join() 242 if dup_found != 1: 243 sys.exit(1) 244 245 if sniffer: 246 sniffer.join() 247 248 if sniffer.foundCorrectPacket: 249 sys.exit(0) 250 else: 251 sys.exit(1) 252 253if __name__ == '__main__': 254 main() 255