1#!/usr/bin/env python3 2# 3# SPDX-License-Identifier: BSD-2-Clause 4# 5# Copyright (c) 2017 Kristof Provost <kp@FreeBSD.org> 6# 7# Redistribution and use in source and binary forms, with or without 8# modification, are permitted provided that the following conditions 9# are met: 10# 1. Redistributions of source code must retain the above copyright 11# notice, this list of conditions and the following disclaimer. 12# 2. Redistributions in binary form must reproduce the above copyright 13# notice, this list of conditions and the following disclaimer in the 14# documentation and/or other materials provided with the distribution. 15# 16# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26# SUCH DAMAGE. 27# 28 29import argparse 30import scapy.all as sp 31import socket 32import sys 33from sniffer import Sniffer 34 35PAYLOAD_MAGIC = bytes.fromhex('42c0ffee') 36 37dup_found = 0 38 39def check_dup(args, packet): 40 """ 41 Verify that this is an ICMP packet, and that we only see one 42 """ 43 global dup_found 44 45 icmp = packet.getlayer(sp.ICMP) 46 if not icmp: 47 return False 48 49 raw = packet.getlayer(sp.Raw) 50 if not raw: 51 return False 52 if raw.load != PAYLOAD_MAGIC: 53 return False 54 55 dup_found = dup_found + 1 56 return False 57 58def check_ping_request(args, packet): 59 if args.ip6: 60 return check_ping6_request(args, packet) 61 else: 62 return check_ping4_request(args, packet) 63 64def check_ping4_request(args, packet): 65 """ 66 Verify that the packet matches what we'd have sent 67 """ 68 dst_ip = args.to[0] 69 70 ip = packet.getlayer(sp.IP) 71 if not ip: 72 return False 73 if ip.dst != dst_ip: 74 return False 75 76 icmp = packet.getlayer(sp.ICMP) 77 if not icmp: 78 return False 79 if sp.icmptypes[icmp.type] != 'echo-request': 80 return False 81 82 raw = packet.getlayer(sp.Raw) 83 if not raw: 84 return False 85 if raw.load != PAYLOAD_MAGIC: 86 return False 87 88 # Wait to check expectations until we've established this is the packet we 89 # sent. 90 if args.expect_tos: 91 if ip.tos != int(args.expect_tos[0]): 92 print("Unexpected ToS value %d, expected %d" \ 93 % (ip.tos, int(args.expect_tos[0]))) 94 return False 95 96 return True 97 98def check_ping6_request(args, packet): 99 """ 100 Verify that the packet matches what we'd have sent 101 """ 102 dst_ip = args.to[0] 103 104 ip = packet.getlayer(sp.IPv6) 105 if not ip: 106 return False 107 if ip.dst != dst_ip: 108 return False 109 110 icmp = packet.getlayer(sp.ICMPv6EchoRequest) 111 if not icmp: 112 return False 113 if icmp.data != PAYLOAD_MAGIC: 114 return False 115 116 return True 117 118def check_ping_reply(args, packet): 119 if args.ip6: 120 return check_ping6_reply(args, packet) 121 else: 122 return check_ping4_reply(args, packet) 123 124def check_ping4_reply(args, packet): 125 """ 126 Check that this is a reply to the ping request we sent 127 """ 128 dst_ip = args.to[0] 129 130 ip = packet.getlayer(sp.IP) 131 if not ip: 132 return False 133 if ip.src != dst_ip: 134 return False 135 136 icmp = packet.getlayer(sp.ICMP) 137 if not icmp: 138 return False 139 if sp.icmptypes[icmp.type] != 'echo-reply': 140 return False 141 142 raw = packet.getlayer(sp.Raw) 143 if not raw: 144 return False 145 if raw.load != PAYLOAD_MAGIC: 146 return False 147 148 return True 149 150def check_ping6_reply(args, packet): 151 """ 152 Check that this is a reply to the ping request we sent 153 """ 154 dst_ip = args.to[0] 155 156 ip = packet.getlayer(sp.IPv6) 157 if not ip: 158 return False 159 if ip.src != dst_ip: 160 return False 161 162 icmp = packet.getlayer(sp.ICMPv6EchoReply) 163 if not icmp: 164 print("No echo reply!") 165 return False 166 167 if icmp.data != PAYLOAD_MAGIC: 168 print("data mismatch") 169 return False 170 171 return True 172 173def ping(send_if, dst_ip, args): 174 ether = sp.Ether() 175 ip = sp.IP(dst=dst_ip) 176 icmp = sp.ICMP(type='echo-request') 177 raw = sp.raw(PAYLOAD_MAGIC) 178 179 if args.send_tos: 180 ip.tos = int(args.send_tos[0]) 181 182 if args.fromaddr: 183 ip.src = args.fromaddr[0] 184 185 req = ether / ip / icmp / raw 186 sp.sendp(req, iface=send_if, verbose=False) 187 188def ping6(send_if, dst_ip, args): 189 ether = sp.Ether() 190 ip6 = sp.IPv6(dst=dst_ip) 191 icmp = sp.ICMPv6EchoRequest(data=sp.raw(PAYLOAD_MAGIC)) 192 193 if args.fromaddr: 194 ip.src = args.fromaddr[0] 195 196 req = ether / ip6 / icmp 197 sp.sendp(req, iface=send_if, verbose=False) 198 199def check_tcpsyn(args, packet): 200 dst_ip = args.to[0] 201 202 ip = packet.getlayer(sp.IP) 203 if not ip: 204 return False 205 if ip.dst != dst_ip: 206 return False 207 208 tcp = packet.getlayer(sp.TCP) 209 if not tcp: 210 return False 211 212 # Verify IP checksum 213 chksum = ip.chksum 214 ip.chksum = None 215 new_chksum = sp.IP(sp.raw(ip)).chksum 216 if chksum != new_chksum: 217 print("Expected IP checksum %x but found %x\n" % (new_cshkum, chksum)) 218 return False 219 220 # Verify TCP checksum 221 chksum = tcp.chksum 222 packet_raw = sp.raw(packet) 223 tcp.chksum = None 224 newpacket = sp.Ether(sp.raw(packet[sp.Ether])) 225 new_chksum = newpacket[sp.TCP].chksum 226 if chksum != new_chksum: 227 print("Expected TCP checksum %x but found %x\n" % (new_chksum, chksum)) 228 return False 229 230 return True 231 232def tcpsyn(send_if, dst_ip, args): 233 opts=[('Timestamp', (1, 1)), ('MSS', 1280)] 234 235 if args.tcpopt_unaligned: 236 opts = [('NOP', 0 )] + opts 237 238 ether = sp.Ether() 239 ip = sp.IP(dst=dst_ip) 240 tcp = sp.TCP(dport=666, flags='S', options=opts) 241 242 req = ether / ip / tcp 243 sp.sendp(req, iface=send_if, verbose=False) 244 245 246def main(): 247 parser = argparse.ArgumentParser("pft_ping.py", 248 description="Ping test tool") 249 parser.add_argument('--sendif', nargs=1, 250 required=True, 251 help='The interface through which the packet(s) will be sent') 252 parser.add_argument('--recvif', nargs=1, 253 help='The interface on which to expect the ICMP echo request') 254 parser.add_argument('--replyif', nargs=1, 255 help='The interface on which to expect the ICMP echo response') 256 parser.add_argument('--checkdup', nargs=1, 257 help='The interface on which to expect the duplicated ICMP packets') 258 parser.add_argument('--ip6', action='store_true', 259 help='Use IPv6') 260 parser.add_argument('--to', nargs=1, 261 required=True, 262 help='The destination IP address for the ICMP echo request') 263 parser.add_argument('--fromaddr', nargs=1, 264 help='The source IP address for the ICMP echo request') 265 266 # TCP options 267 parser.add_argument('--tcpsyn', action='store_true', 268 help='Send a TCP SYN packet') 269 parser.add_argument('--tcpopt_unaligned', action='store_true', 270 help='Include unaligned TCP options') 271 272 # Packet settings 273 parser.add_argument('--send-tos', nargs=1, 274 help='Set the ToS value for the transmitted packet') 275 276 # Expectations 277 parser.add_argument('--expect-tos', nargs=1, 278 help='The expected ToS value in the received packet') 279 280 args = parser.parse_args() 281 282 # We may not have a default route. Tell scapy where to start looking for routes 283 sp.conf.iface6 = args.sendif[0] 284 285 sniffer = None 286 if not args.recvif is None: 287 checkfn=check_ping_request 288 if args.tcpsyn: 289 checkfn=check_tcpsyn 290 291 sniffer = Sniffer(args, checkfn) 292 293 replysniffer = None 294 if not args.replyif is None: 295 checkfn=check_ping_reply 296 replysniffer = Sniffer(args, checkfn, recvif=args.replyif[0]) 297 298 dupsniffer = None 299 if args.checkdup is not None: 300 dupsniffer = Sniffer(args, check_dup, recvif=args.checkdup[0]) 301 302 if args.tcpsyn: 303 tcpsyn(args.sendif[0], args.to[0], args) 304 else: 305 if args.ip6: 306 ping6(args.sendif[0], args.to[0], args) 307 else: 308 ping(args.sendif[0], args.to[0], args) 309 310 if dupsniffer: 311 dupsniffer.join() 312 if dup_found != 1: 313 sys.exit(1) 314 315 if sniffer: 316 sniffer.join() 317 318 if sniffer.foundCorrectPacket: 319 sys.exit(0) 320 else: 321 sys.exit(1) 322 323 if replysniffer: 324 replysniffer.join() 325 326 if replysniffer.foundCorrectPacket: 327 sys.exit(0) 328 else: 329 sys.exit(1) 330 331if __name__ == '__main__': 332 main() 333