1#!/usr/bin/env python3 2# 3# SPDX-License-Identifier: BSD-2-Clause 4# 5# Copyright (c) 2017 Kristof Provost <kp@FreeBSD.org> 6# 7# Redistribution and use in source and binary forms, with or without 8# modification, are permitted provided that the following conditions 9# are met: 10# 1. Redistributions of source code must retain the above copyright 11# notice, this list of conditions and the following disclaimer. 12# 2. Redistributions in binary form must reproduce the above copyright 13# notice, this list of conditions and the following disclaimer in the 14# documentation and/or other materials provided with the distribution. 15# 16# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26# SUCH DAMAGE. 27# 28 29import argparse 30import logging 31logging.getLogger("scapy").setLevel(logging.CRITICAL) 32import scapy.all as sp 33import socket 34import sys 35from sniffer import Sniffer 36 37PAYLOAD_MAGIC = bytes.fromhex('42c0ffee') 38 39dup_found = 0 40 41def check_dup(args, packet): 42 """ 43 Verify that this is an ICMP packet, and that we only see one 44 """ 45 global dup_found 46 47 icmp = packet.getlayer(sp.ICMP) 48 if not icmp: 49 return False 50 51 raw = packet.getlayer(sp.Raw) 52 if not raw: 53 return False 54 if raw.load != PAYLOAD_MAGIC: 55 return False 56 57 dup_found = dup_found + 1 58 return False 59 60def check_ping_request(args, packet): 61 if args.ip6: 62 return check_ping6_request(args, packet) 63 else: 64 return check_ping4_request(args, packet) 65 66def check_ping4_request(args, packet): 67 """ 68 Verify that the packet matches what we'd have sent 69 """ 70 dst_ip = args.to[0] 71 72 ip = packet.getlayer(sp.IP) 73 if not ip: 74 return False 75 if ip.dst != dst_ip: 76 return False 77 78 icmp = packet.getlayer(sp.ICMP) 79 if not icmp: 80 return False 81 if sp.icmptypes[icmp.type] != 'echo-request': 82 return False 83 84 raw = packet.getlayer(sp.Raw) 85 if not raw: 86 return False 87 if raw.load != PAYLOAD_MAGIC: 88 return False 89 90 # Wait to check expectations until we've established this is the packet we 91 # sent. 92 if args.expect_tos: 93 if ip.tos != int(args.expect_tos[0]): 94 print("Unexpected ToS value %d, expected %d" \ 95 % (ip.tos, int(args.expect_tos[0]))) 96 return False 97 98 return True 99 100def check_ping6_request(args, packet): 101 """ 102 Verify that the packet matches what we'd have sent 103 """ 104 dst_ip = args.to[0] 105 106 ip = packet.getlayer(sp.IPv6) 107 if not ip: 108 return False 109 if ip.dst != dst_ip: 110 return False 111 112 icmp = packet.getlayer(sp.ICMPv6EchoRequest) 113 if not icmp: 114 return False 115 if icmp.data != PAYLOAD_MAGIC: 116 return False 117 118 # Wait to check expectations until we've established this is the packet we 119 # sent. 120 if args.expect_tc: 121 if ip.tc != int(args.expect_tc[0]): 122 print("Unexpected traffic class value %d, expected %d" \ 123 % (ip.tc, int(args.expect_tc[0]))) 124 return False 125 126 return True 127 128def check_ping_reply(args, packet): 129 if args.ip6: 130 return check_ping6_reply(args, packet) 131 else: 132 return check_ping4_reply(args, packet) 133 134def check_ping4_reply(args, packet): 135 """ 136 Check that this is a reply to the ping request we sent 137 """ 138 dst_ip = args.to[0] 139 140 ip = packet.getlayer(sp.IP) 141 if not ip: 142 return False 143 if ip.src != dst_ip: 144 return False 145 146 icmp = packet.getlayer(sp.ICMP) 147 if not icmp: 148 return False 149 if sp.icmptypes[icmp.type] != 'echo-reply': 150 return False 151 152 raw = packet.getlayer(sp.Raw) 153 if not raw: 154 return False 155 if raw.load != PAYLOAD_MAGIC: 156 return False 157 158 if args.expect_tos: 159 if ip.tos != int(args.expect_tos[0]): 160 print("Unexpected ToS value %d, expected %d" \ 161 % (ip.tos, int(args.expect_tos[0]))) 162 return False 163 164 return True 165 166def check_ping6_reply(args, packet): 167 """ 168 Check that this is a reply to the ping request we sent 169 """ 170 dst_ip = args.to[0] 171 172 ip = packet.getlayer(sp.IPv6) 173 if not ip: 174 return False 175 if ip.src != dst_ip: 176 return False 177 178 icmp = packet.getlayer(sp.ICMPv6EchoReply) 179 if not icmp: 180 print("No echo reply!") 181 return False 182 183 if icmp.data != PAYLOAD_MAGIC: 184 print("data mismatch") 185 return False 186 187 if args.expect_tc: 188 if ip.tc != int(args.expect_tc[0]): 189 print("Unexpected traffic class value %d, expected %d" \ 190 % (ip.tc, int(args.expect_tc[0]))) 191 return False 192 193 return True 194 195def ping(send_if, dst_ip, args): 196 ether = sp.Ether() 197 ip = sp.IP(dst=dst_ip) 198 icmp = sp.ICMP(type='echo-request') 199 raw = sp.raw(PAYLOAD_MAGIC) 200 201 if args.send_tos: 202 ip.tos = int(args.send_tos[0]) 203 204 if args.fromaddr: 205 ip.src = args.fromaddr[0] 206 207 req = ether / ip / icmp / raw 208 sp.sendp(req, iface=send_if, verbose=False) 209 210def ping6(send_if, dst_ip, args): 211 ether = sp.Ether() 212 ip6 = sp.IPv6(dst=dst_ip) 213 icmp = sp.ICMPv6EchoRequest(data=sp.raw(PAYLOAD_MAGIC)) 214 215 if args.send_tc: 216 ip6.tc = int(args.send_tc[0]) 217 218 if args.fromaddr: 219 ip6.src = args.fromaddr[0] 220 221 req = ether / ip6 / icmp 222 sp.sendp(req, iface=send_if, verbose=False) 223 224def check_tcpsyn(args, packet): 225 dst_ip = args.to[0] 226 227 ip = packet.getlayer(sp.IP) 228 if not ip: 229 return False 230 if ip.dst != dst_ip: 231 return False 232 233 tcp = packet.getlayer(sp.TCP) 234 if not tcp: 235 return False 236 237 # Verify IP checksum 238 chksum = ip.chksum 239 ip.chksum = None 240 new_chksum = sp.IP(sp.raw(ip)).chksum 241 if chksum != new_chksum: 242 print("Expected IP checksum %x but found %x\n" % (new_cshkum, chksum)) 243 return False 244 245 # Verify TCP checksum 246 chksum = tcp.chksum 247 packet_raw = sp.raw(packet) 248 tcp.chksum = None 249 newpacket = sp.Ether(sp.raw(packet[sp.Ether])) 250 new_chksum = newpacket[sp.TCP].chksum 251 if chksum != new_chksum: 252 print("Expected TCP checksum %x but found %x\n" % (new_chksum, chksum)) 253 return False 254 255 return True 256 257def tcpsyn(send_if, dst_ip, args): 258 opts=[('Timestamp', (1, 1)), ('MSS', 1280)] 259 260 if args.tcpopt_unaligned: 261 opts = [('NOP', 0 )] + opts 262 263 ether = sp.Ether() 264 ip = sp.IP(dst=dst_ip) 265 tcp = sp.TCP(dport=666, flags='S', options=opts) 266 267 req = ether / ip / tcp 268 sp.sendp(req, iface=send_if, verbose=False) 269 270 271def main(): 272 parser = argparse.ArgumentParser("pft_ping.py", 273 description="Ping test tool") 274 parser.add_argument('--sendif', nargs=1, 275 required=True, 276 help='The interface through which the packet(s) will be sent') 277 parser.add_argument('--recvif', nargs=1, 278 help='The interface on which to expect the ICMP echo request') 279 parser.add_argument('--replyif', nargs=1, 280 help='The interface on which to expect the ICMP echo response') 281 parser.add_argument('--checkdup', nargs=1, 282 help='The interface on which to expect the duplicated ICMP packets') 283 parser.add_argument('--ip6', action='store_true', 284 help='Use IPv6') 285 parser.add_argument('--to', nargs=1, 286 required=True, 287 help='The destination IP address for the ICMP echo request') 288 parser.add_argument('--fromaddr', nargs=1, 289 help='The source IP address for the ICMP echo request') 290 291 # TCP options 292 parser.add_argument('--tcpsyn', action='store_true', 293 help='Send a TCP SYN packet') 294 parser.add_argument('--tcpopt_unaligned', action='store_true', 295 help='Include unaligned TCP options') 296 297 # Packet settings 298 parser.add_argument('--send-tos', nargs=1, 299 help='Set the ToS value for the transmitted packet') 300 parser.add_argument('--send-tc', nargs=1, 301 help='Set the traffic class value for the transmitted packet') 302 303 # Expectations 304 parser.add_argument('--expect-tos', nargs=1, 305 help='The expected ToS value in the received packet') 306 parser.add_argument('--expect-tc', nargs=1, 307 help='The expected traffic class value in the received packet') 308 309 args = parser.parse_args() 310 311 # We may not have a default route. Tell scapy where to start looking for routes 312 sp.conf.iface6 = args.sendif[0] 313 314 sniffer = None 315 if not args.recvif is None: 316 checkfn=check_ping_request 317 if args.tcpsyn: 318 checkfn=check_tcpsyn 319 320 sniffer = Sniffer(args, checkfn) 321 322 replysniffer = None 323 if not args.replyif is None: 324 checkfn=check_ping_reply 325 replysniffer = Sniffer(args, checkfn, recvif=args.replyif[0]) 326 327 dupsniffer = None 328 if args.checkdup is not None: 329 dupsniffer = Sniffer(args, check_dup, recvif=args.checkdup[0]) 330 331 if args.tcpsyn: 332 tcpsyn(args.sendif[0], args.to[0], args) 333 else: 334 if args.ip6: 335 ping6(args.sendif[0], args.to[0], args) 336 else: 337 ping(args.sendif[0], args.to[0], args) 338 339 if dupsniffer: 340 dupsniffer.join() 341 if dup_found != 1: 342 sys.exit(1) 343 344 if sniffer: 345 sniffer.join() 346 347 if sniffer.foundCorrectPacket: 348 sys.exit(0) 349 else: 350 sys.exit(1) 351 352 if replysniffer: 353 replysniffer.join() 354 355 if replysniffer.foundCorrectPacket: 356 sys.exit(0) 357 else: 358 sys.exit(1) 359 360if __name__ == '__main__': 361 main() 362