1#!/usr/bin/env python3 2# 3# SPDX-License-Identifier: BSD-2-Clause 4# 5# Copyright (c) 2017 Kristof Provost <kp@FreeBSD.org> 6# 7# Redistribution and use in source and binary forms, with or without 8# modification, are permitted provided that the following conditions 9# are met: 10# 1. Redistributions of source code must retain the above copyright 11# notice, this list of conditions and the following disclaimer. 12# 2. Redistributions in binary form must reproduce the above copyright 13# notice, this list of conditions and the following disclaimer in the 14# documentation and/or other materials provided with the distribution. 15# 16# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26# SUCH DAMAGE. 27# 28 29import argparse 30import logging 31logging.getLogger("scapy").setLevel(logging.CRITICAL) 32import scapy.all as sp 33import socket 34import sys 35from sniffer import Sniffer 36 37PAYLOAD_MAGIC = bytes.fromhex('42c0ffee') 38 39dup_found = 0 40 41def check_dup(args, packet): 42 """ 43 Verify that this is an ICMP packet, and that we only see one 44 """ 45 global dup_found 46 47 icmp = packet.getlayer(sp.ICMP) 48 if not icmp: 49 return False 50 51 raw = packet.getlayer(sp.Raw) 52 if not raw: 53 return False 54 if raw.load != PAYLOAD_MAGIC: 55 return False 56 57 dup_found = dup_found + 1 58 return False 59 60def check_ping_request(args, packet): 61 if args.ip6: 62 return check_ping6_request(args, packet) 63 else: 64 return check_ping4_request(args, packet) 65 66def check_ping4_request(args, packet): 67 """ 68 Verify that the packet matches what we'd have sent 69 """ 70 dst_ip = args.to[0] 71 72 ip = packet.getlayer(sp.IP) 73 if not ip: 74 return False 75 if ip.dst != dst_ip: 76 return False 77 78 icmp = packet.getlayer(sp.ICMP) 79 if not icmp: 80 return False 81 if sp.icmptypes[icmp.type] != 'echo-request': 82 return False 83 84 raw = packet.getlayer(sp.Raw) 85 if not raw: 86 return False 87 if raw.load != PAYLOAD_MAGIC: 88 return False 89 90 # Wait to check expectations until we've established this is the packet we 91 # sent. 92 if args.expect_tos: 93 if ip.tos != int(args.expect_tos[0]): 94 print("Unexpected ToS value %d, expected %d" \ 95 % (ip.tos, int(args.expect_tos[0]))) 96 return False 97 98 return True 99 100def check_ping6_request(args, packet): 101 """ 102 Verify that the packet matches what we'd have sent 103 """ 104 dst_ip = args.to[0] 105 106 ip = packet.getlayer(sp.IPv6) 107 if not ip: 108 return False 109 if ip.dst != dst_ip: 110 return False 111 112 icmp = packet.getlayer(sp.ICMPv6EchoRequest) 113 if not icmp: 114 return False 115 if icmp.data != PAYLOAD_MAGIC: 116 return False 117 118 return True 119 120def check_ping_reply(args, packet): 121 if args.ip6: 122 return check_ping6_reply(args, packet) 123 else: 124 return check_ping4_reply(args, packet) 125 126def check_ping4_reply(args, packet): 127 """ 128 Check that this is a reply to the ping request we sent 129 """ 130 dst_ip = args.to[0] 131 132 ip = packet.getlayer(sp.IP) 133 if not ip: 134 return False 135 if ip.src != dst_ip: 136 return False 137 138 icmp = packet.getlayer(sp.ICMP) 139 if not icmp: 140 return False 141 if sp.icmptypes[icmp.type] != 'echo-reply': 142 return False 143 144 raw = packet.getlayer(sp.Raw) 145 if not raw: 146 return False 147 if raw.load != PAYLOAD_MAGIC: 148 return False 149 150 return True 151 152def check_ping6_reply(args, packet): 153 """ 154 Check that this is a reply to the ping request we sent 155 """ 156 dst_ip = args.to[0] 157 158 ip = packet.getlayer(sp.IPv6) 159 if not ip: 160 return False 161 if ip.src != dst_ip: 162 return False 163 164 icmp = packet.getlayer(sp.ICMPv6EchoReply) 165 if not icmp: 166 print("No echo reply!") 167 return False 168 169 if icmp.data != PAYLOAD_MAGIC: 170 print("data mismatch") 171 return False 172 173 return True 174 175def ping(send_if, dst_ip, args): 176 ether = sp.Ether() 177 ip = sp.IP(dst=dst_ip) 178 icmp = sp.ICMP(type='echo-request') 179 raw = sp.raw(PAYLOAD_MAGIC) 180 181 if args.send_tos: 182 ip.tos = int(args.send_tos[0]) 183 184 if args.fromaddr: 185 ip.src = args.fromaddr[0] 186 187 req = ether / ip / icmp / raw 188 sp.sendp(req, iface=send_if, verbose=False) 189 190def ping6(send_if, dst_ip, args): 191 ether = sp.Ether() 192 ip6 = sp.IPv6(dst=dst_ip) 193 icmp = sp.ICMPv6EchoRequest(data=sp.raw(PAYLOAD_MAGIC)) 194 195 if args.fromaddr: 196 ip.src = args.fromaddr[0] 197 198 req = ether / ip6 / icmp 199 sp.sendp(req, iface=send_if, verbose=False) 200 201def check_tcpsyn(args, packet): 202 dst_ip = args.to[0] 203 204 ip = packet.getlayer(sp.IP) 205 if not ip: 206 return False 207 if ip.dst != dst_ip: 208 return False 209 210 tcp = packet.getlayer(sp.TCP) 211 if not tcp: 212 return False 213 214 # Verify IP checksum 215 chksum = ip.chksum 216 ip.chksum = None 217 new_chksum = sp.IP(sp.raw(ip)).chksum 218 if chksum != new_chksum: 219 print("Expected IP checksum %x but found %x\n" % (new_cshkum, chksum)) 220 return False 221 222 # Verify TCP checksum 223 chksum = tcp.chksum 224 packet_raw = sp.raw(packet) 225 tcp.chksum = None 226 newpacket = sp.Ether(sp.raw(packet[sp.Ether])) 227 new_chksum = newpacket[sp.TCP].chksum 228 if chksum != new_chksum: 229 print("Expected TCP checksum %x but found %x\n" % (new_chksum, chksum)) 230 return False 231 232 return True 233 234def tcpsyn(send_if, dst_ip, args): 235 opts=[('Timestamp', (1, 1)), ('MSS', 1280)] 236 237 if args.tcpopt_unaligned: 238 opts = [('NOP', 0 )] + opts 239 240 ether = sp.Ether() 241 ip = sp.IP(dst=dst_ip) 242 tcp = sp.TCP(dport=666, flags='S', options=opts) 243 244 req = ether / ip / tcp 245 sp.sendp(req, iface=send_if, verbose=False) 246 247 248def main(): 249 parser = argparse.ArgumentParser("pft_ping.py", 250 description="Ping test tool") 251 parser.add_argument('--sendif', nargs=1, 252 required=True, 253 help='The interface through which the packet(s) will be sent') 254 parser.add_argument('--recvif', nargs=1, 255 help='The interface on which to expect the ICMP echo request') 256 parser.add_argument('--replyif', nargs=1, 257 help='The interface on which to expect the ICMP echo response') 258 parser.add_argument('--checkdup', nargs=1, 259 help='The interface on which to expect the duplicated ICMP packets') 260 parser.add_argument('--ip6', action='store_true', 261 help='Use IPv6') 262 parser.add_argument('--to', nargs=1, 263 required=True, 264 help='The destination IP address for the ICMP echo request') 265 parser.add_argument('--fromaddr', nargs=1, 266 help='The source IP address for the ICMP echo request') 267 268 # TCP options 269 parser.add_argument('--tcpsyn', action='store_true', 270 help='Send a TCP SYN packet') 271 parser.add_argument('--tcpopt_unaligned', action='store_true', 272 help='Include unaligned TCP options') 273 274 # Packet settings 275 parser.add_argument('--send-tos', nargs=1, 276 help='Set the ToS value for the transmitted packet') 277 278 # Expectations 279 parser.add_argument('--expect-tos', nargs=1, 280 help='The expected ToS value in the received packet') 281 282 args = parser.parse_args() 283 284 # We may not have a default route. Tell scapy where to start looking for routes 285 sp.conf.iface6 = args.sendif[0] 286 287 sniffer = None 288 if not args.recvif is None: 289 checkfn=check_ping_request 290 if args.tcpsyn: 291 checkfn=check_tcpsyn 292 293 sniffer = Sniffer(args, checkfn) 294 295 replysniffer = None 296 if not args.replyif is None: 297 checkfn=check_ping_reply 298 replysniffer = Sniffer(args, checkfn, recvif=args.replyif[0]) 299 300 dupsniffer = None 301 if args.checkdup is not None: 302 dupsniffer = Sniffer(args, check_dup, recvif=args.checkdup[0]) 303 304 if args.tcpsyn: 305 tcpsyn(args.sendif[0], args.to[0], args) 306 else: 307 if args.ip6: 308 ping6(args.sendif[0], args.to[0], args) 309 else: 310 ping(args.sendif[0], args.to[0], args) 311 312 if dupsniffer: 313 dupsniffer.join() 314 if dup_found != 1: 315 sys.exit(1) 316 317 if sniffer: 318 sniffer.join() 319 320 if sniffer.foundCorrectPacket: 321 sys.exit(0) 322 else: 323 sys.exit(1) 324 325 if replysniffer: 326 replysniffer.join() 327 328 if replysniffer.foundCorrectPacket: 329 sys.exit(0) 330 else: 331 sys.exit(1) 332 333if __name__ == '__main__': 334 main() 335