1#!/usr/bin/env python3 2# 3# SPDX-License-Identifier: BSD-2-Clause 4# 5# Copyright (c) 2017 Kristof Provost <kp@FreeBSD.org> 6# Copyright (c) 2023 Kajetan Staszkiewicz <vegeta@tuxpowered.net> 7# 8# Redistribution and use in source and binary forms, with or without 9# modification, are permitted provided that the following conditions 10# are met: 11# 1. Redistributions of source code must retain the above copyright 12# notice, this list of conditions and the following disclaimer. 13# 2. Redistributions in binary form must reproduce the above copyright 14# notice, this list of conditions and the following disclaimer in the 15# documentation and/or other materials provided with the distribution. 16# 17# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 18# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 21# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27# SUCH DAMAGE. 28# 29 30import argparse 31import logging 32logging.getLogger("scapy").setLevel(logging.CRITICAL) 33import math 34import scapy.all as sp 35import sys 36 37from copy import copy 38from sniffer import Sniffer 39 40logging.basicConfig(format='%(message)s') 41LOGGER = logging.getLogger(__name__) 42 43PAYLOAD_MAGIC = bytes.fromhex('42c0ffee') 44 45def build_payload(l): 46 pl = len(PAYLOAD_MAGIC) 47 ret = PAYLOAD_MAGIC * math.floor(l/pl) 48 ret += PAYLOAD_MAGIC[0:(l % pl)] 49 return ret 50 51 52def prepare_ipv6(dst_address, send_params): 53 src_address = send_params.get('src_address') 54 hlim = send_params.get('hlim') 55 tc = send_params.get('tc') 56 ip6 = sp.IPv6(dst=dst_address) 57 if src_address: 58 ip6.src = src_address 59 if hlim: 60 ip6.hlim = hlim 61 if tc: 62 ip6.tc = tc 63 return ip6 64 65 66def prepare_ipv4(dst_address, send_params): 67 src_address = send_params.get('src_address') 68 flags = send_params.get('flags') 69 tos = send_params.get('tc') 70 ttl = send_params.get('hlim') 71 ip = sp.IP(dst=dst_address) 72 if src_address: 73 ip.src = src_address 74 if flags: 75 ip.flags = flags 76 if tos: 77 ip.tos = tos 78 if ttl: 79 ip.ttl = ttl 80 return ip 81 82 83def send_icmp_ping(dst_address, sendif, send_params): 84 send_length = send_params['length'] 85 ether = sp.Ether() 86 if ':' in dst_address: 87 ip6 = prepare_ipv6(dst_address, send_params) 88 icmp = sp.ICMPv6EchoRequest(data=sp.raw(build_payload(send_length))) 89 req = ether / ip6 / icmp 90 else: 91 ip = prepare_ipv4(dst_address, send_params) 92 icmp = sp.ICMP(type='echo-request') 93 raw = sp.raw(build_payload(send_length)) 94 req = ether / ip / icmp / raw 95 sp.sendp(req, sendif, verbose=False) 96 97 98def send_tcp_syn(dst_address, sendif, send_params): 99 tcpopt_unaligned = send_params.get('tcpopt_unaligned') 100 seq = send_params.get('seq') 101 mss = send_params.get('mss') 102 ether = sp.Ether() 103 opts=[('Timestamp', (1, 1)), ('MSS', mss if mss else 1280)] 104 if tcpopt_unaligned: 105 opts = [('NOP', 0 )] + opts 106 if ':' in dst_address: 107 ip = prepare_ipv6(dst_address, send_params) 108 else: 109 ip = prepare_ipv4(dst_address, send_params) 110 tcp = sp.TCP(dport=666, flags='S', options=opts, seq=seq) 111 req = ether / ip / tcp 112 sp.sendp(req, iface=sendif, verbose=False) 113 114 115def send_ping(dst_address, sendif, ping_type, send_params): 116 if ping_type == 'icmp': 117 send_icmp_ping(dst_address, sendif, send_params) 118 elif ping_type == 'tcpsyn': 119 send_tcp_syn(dst_address, sendif, send_params) 120 else: 121 raise Exception('Unspported ping type') 122 123 124def check_ipv4(expect_params, packet): 125 src_address = expect_params.get('src_address') 126 dst_address = expect_params.get('dst_address') 127 flags = expect_params.get('flags') 128 tos = expect_params.get('tc') 129 ttl = expect_params.get('hlim') 130 ip = packet.getlayer(sp.IP) 131 if not ip: 132 LOGGER.debug('Packet is not IPv4!') 133 return False 134 if src_address and ip.src != src_address: 135 LOGGER.debug('Source IPv4 address does not match!') 136 return False 137 if dst_address and ip.dst != dst_address: 138 LOGGER.debug('Destination IPv4 address does not match!') 139 return False 140 chksum = ip.chksum 141 ip.chksum = None 142 new_chksum = sp.IP(sp.raw(ip)).chksum 143 if chksum != new_chksum: 144 LOGGER.debug(f'Expected IP checksum {new_chksum} but found {chksum}') 145 return False 146 if flags and ip.flags != flags: 147 LOGGER.debug(f'Wrong IP flags value {ip.flags}, expected {flags}') 148 return False 149 if tos and ip.tos != tos: 150 LOGGER.debug(f'Wrong ToS value {ip.tos}, expected {tos}') 151 return False 152 if ttl and ip.ttl != ttl: 153 LOGGER.debug(f'Wrong TTL value {ip.ttl}, expected {ttl}') 154 return False 155 return True 156 157 158def check_ipv6(expect_params, packet): 159 src_address = expect_params.get('src_address') 160 dst_address = expect_params.get('dst_address') 161 flags = expect_params.get('flags') 162 hlim = expect_params.get('hlim') 163 tc = expect_params.get('tc') 164 ip6 = packet.getlayer(sp.IPv6) 165 if not ip6: 166 LOGGER.debug('Packet is not IPv6!') 167 return False 168 if src_address and ip6.src != src_address: 169 LOGGER.debug('Source IPv6 address does not match!') 170 return False 171 if dst_address and ip6.dst != dst_address: 172 LOGGER.debug('Destination IPv6 address does not match!') 173 return False 174 # IPv6 has no IP-level checksum. 175 if flags: 176 raise Exception("There's no fragmentation flags in IPv6") 177 if hlim and ip6.hlim != hlim: 178 LOGGER.debug(f'Wrong Hop Limit value {ip6.hlim}, expected {hlim}') 179 return False 180 if tc and ip6.tc != tc: 181 LOGGER.debug(f'Wrong TC value {ip6.tc}, expected {tc}') 182 return False 183 return True 184 185 186def check_ping_4(expect_params, packet): 187 expect_length = expect_params['length'] 188 if not check_ipv4(expect_params, packet): 189 return False 190 icmp = packet.getlayer(sp.ICMP) 191 if not icmp: 192 LOGGER.debug('Packet is not IPv4 ICMP!') 193 return False 194 raw = packet.getlayer(sp.Raw) 195 if not raw: 196 LOGGER.debug('Packet contains no payload!') 197 return False 198 if raw.load != build_payload(expect_length): 199 LOGGER.debug('Payload magic does not match!') 200 return False 201 return True 202 203 204def check_ping_request_4(expect_params, packet): 205 if not check_ping_4(expect_params, packet): 206 return False 207 icmp = packet.getlayer(sp.ICMP) 208 if sp.icmptypes[icmp.type] != 'echo-request': 209 LOGGER.debug('Packet is not IPv4 ICMP Echo Request!') 210 return False 211 return True 212 213 214def check_ping_reply_4(expect_params, packet): 215 if not check_ping_4(expect_params, packet): 216 return False 217 icmp = packet.getlayer(sp.ICMP) 218 if sp.icmptypes[icmp.type] != 'echo-reply': 219 LOGGER.debug('Packet is not IPv4 ICMP Echo Reply!') 220 return False 221 return True 222 223 224def check_ping_request_6(expect_params, packet): 225 expect_length = expect_params['length'] 226 if not check_ipv6(expect_params, packet): 227 return False 228 icmp = packet.getlayer(sp.ICMPv6EchoRequest) 229 if not icmp: 230 LOGGER.debug('Packet is not IPv6 ICMP Echo Request!') 231 return False 232 if icmp.data != build_payload(expect_length): 233 LOGGER.debug('Payload magic does not match!') 234 return False 235 return True 236 237 238def check_ping_reply_6(expect_params, packet): 239 expect_length = expect_params['length'] 240 if not check_ipv6(expect_params, packet): 241 return False 242 icmp = packet.getlayer(sp.ICMPv6EchoReply) 243 if not icmp: 244 LOGGER.debug('Packet is not IPv6 ICMP Echo Reply!') 245 return False 246 if icmp.data != build_payload(expect_length): 247 LOGGER.debug('Payload magic does not match!') 248 return False 249 return True 250 251 252def check_ping_request(expect_params, packet): 253 src_address = expect_params.get('src_address') 254 dst_address = expect_params.get('dst_address') 255 if not (src_address or dst_address): 256 raise Exception('Source or destination address must be given to match the ping request!') 257 if ( 258 (src_address and ':' in src_address) or 259 (dst_address and ':' in dst_address) 260 ): 261 return check_ping_request_6(expect_params, packet) 262 else: 263 return check_ping_request_4(expect_params, packet) 264 265 266def check_ping_reply(expect_params, packet): 267 src_address = expect_params.get('src_address') 268 dst_address = expect_params.get('dst_address') 269 if not (src_address or dst_address): 270 raise Exception('Source or destination address must be given to match the ping reply!') 271 if ( 272 (src_address and ':' in src_address) or 273 (dst_address and ':' in dst_address) 274 ): 275 return check_ping_reply_6(expect_params, packet) 276 else: 277 return check_ping_reply_4(expect_params, packet) 278 279 280def check_tcp(expect_params, packet): 281 tcp_flags = expect_params.get('tcp_flags') 282 mss = expect_params.get('mss') 283 seq = expect_params.get('seq') 284 tcp = packet.getlayer(sp.TCP) 285 if not tcp: 286 LOGGER.debug('Packet is not TCP!') 287 return False 288 chksum = tcp.chksum 289 tcp.chksum = None 290 newpacket = sp.Ether(sp.raw(packet[sp.Ether])) 291 new_chksum = newpacket[sp.TCP].chksum 292 if chksum != new_chksum: 293 LOGGER.debug(f'Wrong TCP checksum {chksum}, expected {new_chksum}!') 294 return False 295 if tcp_flags and tcp.flags != tcp_flags: 296 LOGGER.debug(f'Wrong TCP flags {tcp.flags}, expected {tcp_flags}!') 297 return False 298 if seq: 299 if tcp_flags == 'S': 300 tcp_seq = tcp.seq 301 elif tcp_flags == 'SA': 302 tcp_seq = tcp.ack - 1 303 if seq != tcp_seq: 304 LOGGER.debug(f'Wrong TCP Sequence Number {tcp_seq}, expected {seq}') 305 return False 306 if mss: 307 for option in tcp.options: 308 if option[0] == 'MSS': 309 if option[1] != mss: 310 LOGGER.debug(f'Wrong TCP MSS {option[1]}, expected {mss}') 311 return False 312 return True 313 314 315def check_tcp_syn_request_4(expect_params, packet): 316 if not check_ipv4(expect_params, packet): 317 return False 318 if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet): 319 return False 320 return True 321 322 323def check_tcp_syn_reply_4(expect_params, packet): 324 if not check_ipv4(expect_params, packet): 325 return False 326 if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet): 327 return False 328 return True 329 330 331def check_tcp_syn_request_6(expect_params, packet): 332 if not check_ipv6(expect_params, packet): 333 return False 334 if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet): 335 return False 336 return True 337 338 339def check_tcp_syn_reply_6(expect_params, packet): 340 if not check_ipv6(expect_params, packet): 341 return False 342 if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet): 343 return False 344 return True 345 346 347def check_tcp_syn_request(expect_params, packet): 348 src_address = expect_params.get('src_address') 349 dst_address = expect_params.get('dst_address') 350 if not (src_address or dst_address): 351 raise Exception('Source or destination address must be given to match the tcp syn request!') 352 if ( 353 (src_address and ':' in src_address) or 354 (dst_address and ':' in dst_address) 355 ): 356 return check_tcp_syn_request_6(expect_params, packet) 357 else: 358 return check_tcp_syn_request_4(expect_params, packet) 359 360 361def check_tcp_syn_reply(expect_params, packet): 362 src_address = expect_params.get('src_address') 363 dst_address = expect_params.get('dst_address') 364 if not (src_address or dst_address): 365 raise Exception('Source or destination address must be given to match the tcp syn reply!') 366 if ( 367 (src_address and ':' in src_address) or 368 (dst_address and ':' in dst_address) 369 ): 370 return check_tcp_syn_reply_6(expect_params, packet) 371 else: 372 return check_tcp_syn_reply_4(expect_params, packet) 373 374 375def setup_sniffer(recvif, ping_type, sniff_type, expect_params): 376 if ping_type == 'icmp' and sniff_type == 'request': 377 checkfn = check_ping_request 378 elif ping_type == 'icmp' and sniff_type == 'reply': 379 checkfn = check_ping_reply 380 elif ping_type == 'tcpsyn' and sniff_type == 'request': 381 checkfn = check_tcp_syn_request 382 elif ping_type == 'tcpsyn' and sniff_type == 'reply': 383 checkfn = check_tcp_syn_reply 384 else: 385 raise Exception('Unspported ping or sniff type') 386 387 return Sniffer(expect_params, checkfn, recvif) 388 389 390def parse_args(): 391 parser = argparse.ArgumentParser("pft_ping.py", 392 description="Ping test tool") 393 394 # Parameters of sent ping request 395 parser.add_argument('--sendif', nargs=1, 396 required=True, 397 help='The interface through which the packet(s) will be sent') 398 parser.add_argument('--to', nargs=1, 399 required=True, 400 help='The destination IP address for the ping request') 401 parser.add_argument('--ping-type', 402 choices=('icmp', 'tcpsyn'), 403 help='Type of ping: ICMP (default) or TCP SYN', 404 default='icmp') 405 parser.add_argument('--fromaddr', nargs=1, 406 help='The source IP address for the ping request') 407 408 # Where to look for packets to analyze. 409 # The '+' format is ugly as it mixes positional with optional syntax. 410 # But we have no positional parameters so I guess it's fine to use it. 411 parser.add_argument('--recvif', nargs='+', 412 help='The interfaces on which to expect the ping request') 413 parser.add_argument('--replyif', nargs='+', 414 help='The interfaces which to expect the ping response') 415 416 # Packet settings 417 parser_send = parser.add_argument_group('Values set in transmitted packets') 418 parser_send.add_argument('--send-flags', nargs=1, type=str, 419 help='IPv4 fragmentation flags') 420 parser_send.add_argument('--send-hlim', nargs=1, type=int, 421 help='IPv6 Hop Limit or IPv4 Time To Live') 422 parser_send.add_argument('--send-mss', nargs=1, type=int, 423 help='TCP Maximum Segment Size') 424 parser_send.add_argument('--send-seq', nargs=1, type=int, 425 help='TCP sequence number') 426 parser_send.add_argument('--send-length', nargs=1, type=int, 427 default=[len(PAYLOAD_MAGIC)], help='ICMP Echo Request payload size') 428 parser_send.add_argument('--send-tc', nargs=1, type=int, 429 help='IPv6 Traffic Class or IPv4 DiffServ / ToS') 430 parser_send.add_argument('--send-tcpopt-unaligned', action='store_true', 431 help='Include unaligned TCP options') 432 433 # Expectations 434 parser_expect = parser.add_argument_group('Values expected in sniffed packets') 435 parser_expect.add_argument('--expect-flags', nargs=1, type=str, 436 help='IPv4 fragmentation flags') 437 parser_expect.add_argument('--expect-hlim', nargs=1, type=int, 438 help='IPv6 Hop Limit or IPv4 Time To Live') 439 parser_expect.add_argument('--expect-mss', nargs=1, type=int, 440 help='TCP Maximum Segment Size') 441 parser_send.add_argument('--expect-seq', nargs=1, type=int, 442 help='TCP sequence number') 443 parser_expect.add_argument('--expect-tc', nargs=1, type=int, 444 help='IPv6 Traffic Class or IPv4 DiffServ / ToS') 445 446 parser.add_argument('-v', '--verbose', action='store_true', 447 help=('Enable verbose logging. Apart of potentially useful information ' 448 'you might see warnings from parsing packets like NDP or other ' 449 'packets not related to the test being run. Use only when ' 450 'developing because real tests expect empty stderr and stdout.')) 451 452 return parser.parse_args() 453 454 455def main(): 456 args = parse_args() 457 458 if args.verbose: 459 LOGGER.setLevel(logging.DEBUG) 460 461 # Dig out real values of program arguments 462 send_if = args.sendif[0] 463 reply_ifs = args.replyif 464 recv_ifs = args.recvif 465 dst_address = args.to[0] 466 467 # Standardize parameters which have nargs=1. 468 send_params = {} 469 expect_params = {} 470 for param_name in ('flags', 'hlim', 'length', 'mss', 'seq', 'tc'): 471 param_arg = vars(args).get(f'send_{param_name}') 472 send_params[param_name] = param_arg[0] if param_arg else None 473 param_arg = vars(args).get(f'expect_{param_name}') 474 expect_params[param_name] = param_arg[0] if param_arg else None 475 476 expect_params['length'] = send_params['length'] 477 send_params['tcpopt_unaligned'] = args.send_tcpopt_unaligned 478 send_params['src_address'] = args.fromaddr[0] if args.fromaddr else None 479 480 # We may not have a default route. Tell scapy where to start looking for routes 481 sp.conf.iface6 = send_if 482 483 # Configuration sanity checking. 484 if not (reply_ifs or recv_ifs): 485 raise Exception('With no reply or recv interface specified no traffic ' 486 'can be sniffed and verified!' 487 ) 488 489 sniffers = [] 490 491 if recv_ifs: 492 sniffer_params = copy(expect_params) 493 sniffer_params['src_address'] = None 494 sniffer_params['dst_address'] = dst_address 495 for iface in recv_ifs: 496 LOGGER.debug(f'Installing receive sniffer on {iface}') 497 sniffers.append( 498 setup_sniffer(iface, args.ping_type, 'request', sniffer_params, 499 )) 500 501 if reply_ifs: 502 sniffer_params = copy(expect_params) 503 sniffer_params['src_address'] = dst_address 504 sniffer_params['dst_address'] = None 505 for iface in reply_ifs: 506 LOGGER.debug(f'Installing reply sniffer on {iface}') 507 sniffers.append( 508 setup_sniffer(iface, args.ping_type, 'reply', sniffer_params, 509 )) 510 511 LOGGER.debug(f'Installed {len(sniffers)} sniffers') 512 513 send_ping(dst_address, send_if, args.ping_type, send_params) 514 515 err = 0 516 sniffer_num = 0 517 for sniffer in sniffers: 518 sniffer.join() 519 if sniffer.correctPackets == 1: 520 LOGGER.debug(f'Expected ping has been sniffed on {sniffer._recvif}.') 521 else: 522 # Set a bit in err for each failed sniffer. 523 err |= 1<<sniffer_num 524 if sniffer.correctPackets > 1: 525 LOGGER.debug(f'Duplicated ping has been sniffed on {sniffer._recvif}!') 526 else: 527 LOGGER.debug(f'Expected ping has not been sniffed on {sniffer._recvif}!') 528 sniffer_num += 1 529 530 return err 531 532 533if __name__ == '__main__': 534 sys.exit(main()) 535