1#!/usr/bin/env python3 2# 3# SPDX-License-Identifier: BSD-2-Clause 4# 5# Copyright (c) 2017 Kristof Provost <kp@FreeBSD.org> 6# 7# Redistribution and use in source and binary forms, with or without 8# modification, are permitted provided that the following conditions 9# are met: 10# 1. Redistributions of source code must retain the above copyright 11# notice, this list of conditions and the following disclaimer. 12# 2. Redistributions in binary form must reproduce the above copyright 13# notice, this list of conditions and the following disclaimer in the 14# documentation and/or other materials provided with the distribution. 15# 16# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26# SUCH DAMAGE. 27# 28 29import argparse 30import scapy.all as sp 31import socket 32import sys 33from sniffer import Sniffer 34 35PAYLOAD_MAGIC = bytes.fromhex('42c0ffee') 36 37dup_found = 0 38 39def check_dup(args, packet): 40 """ 41 Verify that this is an ICMP packet, and that we only see one 42 """ 43 global dup_found 44 45 icmp = packet.getlayer(sp.ICMP) 46 if not icmp: 47 return False 48 49 raw = packet.getlayer(sp.Raw) 50 if not raw: 51 return False 52 if raw.load != PAYLOAD_MAGIC: 53 return False 54 55 dup_found = dup_found + 1 56 return False 57 58def check_ping_request(args, packet): 59 if args.ip6: 60 return check_ping6_request(args, packet) 61 else: 62 return check_ping4_request(args, packet) 63 64def check_ping4_request(args, packet): 65 """ 66 Verify that the packet matches what we'd have sent 67 """ 68 dst_ip = args.to[0] 69 70 ip = packet.getlayer(sp.IP) 71 if not ip: 72 return False 73 if ip.dst != dst_ip: 74 return False 75 76 icmp = packet.getlayer(sp.ICMP) 77 if not icmp: 78 return False 79 if sp.icmptypes[icmp.type] != 'echo-request': 80 return False 81 82 raw = packet.getlayer(sp.Raw) 83 if not raw: 84 return False 85 if raw.load != PAYLOAD_MAGIC: 86 return False 87 88 # Wait to check expectations until we've established this is the packet we 89 # sent. 90 if args.expect_tos: 91 if ip.tos != int(args.expect_tos[0]): 92 print("Unexpected ToS value %d, expected %d" \ 93 % (ip.tos, int(args.expect_tos[0]))) 94 return False 95 96 return True 97 98def check_ping6_request(args, packet): 99 """ 100 Verify that the packet matches what we'd have sent 101 """ 102 dst_ip = args.to[0] 103 104 ip = packet.getlayer(sp.IPv6) 105 if not ip: 106 return False 107 if ip.dst != dst_ip: 108 return False 109 110 icmp = packet.getlayer(sp.ICMPv6EchoRequest) 111 if not icmp: 112 return False 113 if icmp.data != PAYLOAD_MAGIC: 114 return False 115 116 return True 117 118def check_ping_reply(args, packet): 119 return check_ping4_reply(args, packet) 120 121def check_ping4_reply(args, packet): 122 """ 123 Check that this is a reply to the ping request we sent 124 """ 125 dst_ip = args.to[0] 126 127 ip = packet.getlayer(sp.IP) 128 if not ip: 129 return False 130 if ip.src != dst_ip: 131 return False 132 133 icmp = packet.getlayer(sp.ICMP) 134 if not icmp: 135 return False 136 if sp.icmptypes[icmp.type] != 'echo-reply': 137 return False 138 139 raw = packet.getlayer(sp.Raw) 140 if not raw: 141 return False 142 if raw.load != PAYLOAD_MAGIC: 143 return False 144 145 return True 146 147def ping(send_if, dst_ip, args): 148 ether = sp.Ether() 149 ip = sp.IP(dst=dst_ip) 150 icmp = sp.ICMP(type='echo-request') 151 raw = sp.raw(PAYLOAD_MAGIC) 152 153 if args.send_tos: 154 ip.tos = int(args.send_tos[0]) 155 156 if args.fromaddr: 157 ip.src = args.fromaddr[0] 158 159 req = ether / ip / icmp / raw 160 sp.sendp(req, iface=send_if, verbose=False) 161 162def ping6(send_if, dst_ip, args): 163 ether = sp.Ether() 164 ip6 = sp.IPv6(dst=dst_ip) 165 icmp = sp.ICMPv6EchoRequest(data=sp.raw(PAYLOAD_MAGIC)) 166 167 if args.fromaddr: 168 ip.src = args.fromaddr[0] 169 170 req = ether / ip6 / icmp 171 sp.sendp(req, iface=send_if, verbose=False) 172 173def check_tcpsyn(args, packet): 174 dst_ip = args.to[0] 175 176 ip = packet.getlayer(sp.IP) 177 if not ip: 178 return False 179 if ip.dst != dst_ip: 180 return False 181 182 tcp = packet.getlayer(sp.TCP) 183 if not tcp: 184 return False 185 186 # Verify IP checksum 187 chksum = ip.chksum 188 ip.chksum = None 189 new_chksum = sp.IP(sp.raw(ip)).chksum 190 if chksum != new_chksum: 191 print("Expected IP checksum %x but found %x\n" % (new_cshkum, chksum)) 192 return False 193 194 # Verify TCP checksum 195 chksum = tcp.chksum 196 packet_raw = sp.raw(packet) 197 tcp.chksum = None 198 newpacket = sp.Ether(sp.raw(packet[sp.Ether])) 199 new_chksum = newpacket[sp.TCP].chksum 200 if chksum != new_chksum: 201 print("Expected TCP checksum %x but found %x\n" % (new_chksum, chksum)) 202 return False 203 204 return True 205 206def tcpsyn(send_if, dst_ip, args): 207 opts=[('Timestamp', (1, 1)), ('MSS', 1280)] 208 209 if args.tcpopt_unaligned: 210 opts = [('NOP', 0 )] + opts 211 212 ether = sp.Ether() 213 ip = sp.IP(dst=dst_ip) 214 tcp = sp.TCP(dport=666, flags='S', options=opts) 215 216 req = ether / ip / tcp 217 sp.sendp(req, iface=send_if, verbose=False) 218 219 220def main(): 221 parser = argparse.ArgumentParser("pft_ping.py", 222 description="Ping test tool") 223 parser.add_argument('--sendif', nargs=1, 224 required=True, 225 help='The interface through which the packet(s) will be sent') 226 parser.add_argument('--recvif', nargs=1, 227 help='The interface on which to expect the ICMP echo request') 228 parser.add_argument('--replyif', nargs=1, 229 help='The interface on which to expect the ICMP echo response') 230 parser.add_argument('--checkdup', nargs=1, 231 help='The interface on which to expect the duplicated ICMP packets') 232 parser.add_argument('--ip6', action='store_true', 233 help='Use IPv6') 234 parser.add_argument('--to', nargs=1, 235 required=True, 236 help='The destination IP address for the ICMP echo request') 237 parser.add_argument('--fromaddr', nargs=1, 238 help='The source IP address for the ICMP echo request') 239 240 # TCP options 241 parser.add_argument('--tcpsyn', action='store_true', 242 help='Send a TCP SYN packet') 243 parser.add_argument('--tcpopt_unaligned', action='store_true', 244 help='Include unaligned TCP options') 245 246 # Packet settings 247 parser.add_argument('--send-tos', nargs=1, 248 help='Set the ToS value for the transmitted packet') 249 250 # Expectations 251 parser.add_argument('--expect-tos', nargs=1, 252 help='The expected ToS value in the received packet') 253 254 args = parser.parse_args() 255 256 # We may not have a default route. Tell scapy where to start looking for routes 257 sp.conf.iface6 = args.sendif[0] 258 259 sniffer = None 260 if not args.recvif is None: 261 checkfn=check_ping_request 262 if args.tcpsyn: 263 checkfn=check_tcpsyn 264 265 sniffer = Sniffer(args, checkfn) 266 267 replysniffer = None 268 if not args.replyif is None: 269 checkfn=check_ping_reply 270 replysniffer = Sniffer(args, checkfn, recvif=args.replyif[0]) 271 272 dupsniffer = None 273 if args.checkdup is not None: 274 dupsniffer = Sniffer(args, check_dup, recvif=args.checkdup[0]) 275 276 if args.tcpsyn: 277 tcpsyn(args.sendif[0], args.to[0], args) 278 else: 279 if args.ip6: 280 ping6(args.sendif[0], args.to[0], args) 281 else: 282 ping(args.sendif[0], args.to[0], args) 283 284 if dupsniffer: 285 dupsniffer.join() 286 if dup_found != 1: 287 sys.exit(1) 288 289 if sniffer: 290 sniffer.join() 291 292 if sniffer.foundCorrectPacket: 293 sys.exit(0) 294 else: 295 sys.exit(1) 296 297 if replysniffer: 298 replysniffer.join() 299 300 if replysniffer.foundCorrectPacket: 301 sys.exit(0) 302 else: 303 sys.exit(1) 304 305if __name__ == '__main__': 306 main() 307