1#!/usr/bin/env python3 2# 3# SPDX-License-Identifier: BSD-2-Clause 4# 5# Copyright (c) 2017 Kristof Provost <kp@FreeBSD.org> 6# Copyright (c) 2023 Kajetan Staszkiewicz <vegeta@tuxpowered.net> 7# 8# Redistribution and use in source and binary forms, with or without 9# modification, are permitted provided that the following conditions 10# are met: 11# 1. Redistributions of source code must retain the above copyright 12# notice, this list of conditions and the following disclaimer. 13# 2. Redistributions in binary form must reproduce the above copyright 14# notice, this list of conditions and the following disclaimer in the 15# documentation and/or other materials provided with the distribution. 16# 17# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 18# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 21# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27# SUCH DAMAGE. 28# 29 30import argparse 31import logging 32logging.getLogger("scapy").setLevel(logging.CRITICAL) 33import math 34import scapy.all as sp 35import sys 36 37from copy import copy 38from sniffer import Sniffer 39 40logging.basicConfig(format='%(message)s') 41LOGGER = logging.getLogger(__name__) 42 43PAYLOAD_MAGIC = bytes.fromhex('42c0ffee') 44 45def build_payload(l): 46 pl = len(PAYLOAD_MAGIC) 47 ret = PAYLOAD_MAGIC * math.floor(l/pl) 48 ret += PAYLOAD_MAGIC[0:(l % pl)] 49 return ret 50 51 52def prepare_ipv6(dst_address, send_params): 53 src_address = send_params.get('src_address') 54 hlim = send_params.get('hlim') 55 tc = send_params.get('tc') 56 ip6 = sp.IPv6(dst=dst_address) 57 if src_address: 58 ip6.src = src_address 59 if hlim: 60 ip6.hlim = hlim 61 if tc: 62 ip6.tc = tc 63 return ip6 64 65 66def prepare_ipv4(dst_address, send_params): 67 src_address = send_params.get('src_address') 68 flags = send_params.get('flags') 69 tos = send_params.get('tc') 70 ttl = send_params.get('hlim') 71 opt = send_params.get('nop') 72 options = '' 73 if opt: 74 options='\x00' 75 ip = sp.IP(dst=dst_address, options=options) 76 if src_address: 77 ip.src = src_address 78 if flags: 79 ip.flags = flags 80 if tos: 81 ip.tos = tos 82 if ttl: 83 ip.ttl = ttl 84 return ip 85 86 87def send_icmp_ping(dst_address, sendif, send_params): 88 send_length = send_params['length'] 89 send_frag_length = send_params['frag_length'] 90 packets = [] 91 ether = sp.Ether() 92 if ':' in dst_address: 93 ip6 = prepare_ipv6(dst_address, send_params) 94 icmp = sp.ICMPv6EchoRequest(data=sp.raw(build_payload(send_length))) 95 if send_frag_length: 96 for packet in sp.fragment(ip6 / icmp, fragsize=send_frag_length): 97 packets.append(ether / packet) 98 else: 99 packets.append(ether / ip6 / icmp) 100 101 else: 102 ip = prepare_ipv4(dst_address, send_params) 103 icmp = sp.ICMP(type='echo-request') 104 raw = sp.raw(build_payload(send_length)) 105 if send_frag_length: 106 for packet in sp.fragment(ip / icmp / raw, fragsize=send_frag_length): 107 packets.append(ether / packet) 108 else: 109 packets.append(ether / ip / icmp / raw) 110 for packet in packets: 111 sp.sendp(packet, sendif, verbose=False) 112 113 114def send_tcp_syn(dst_address, sendif, send_params): 115 tcpopt_unaligned = send_params.get('tcpopt_unaligned') 116 seq = send_params.get('seq') 117 mss = send_params.get('mss') 118 ether = sp.Ether() 119 opts=[('Timestamp', (1, 1)), ('MSS', mss if mss else 1280)] 120 if tcpopt_unaligned: 121 opts = [('NOP', 0 )] + opts 122 if ':' in dst_address: 123 ip = prepare_ipv6(dst_address, send_params) 124 else: 125 ip = prepare_ipv4(dst_address, send_params) 126 tcp = sp.TCP(dport=666, flags='S', options=opts, seq=seq) 127 req = ether / ip / tcp 128 sp.sendp(req, iface=sendif, verbose=False) 129 130 131def send_ping(dst_address, sendif, ping_type, send_params): 132 if ping_type == 'icmp': 133 send_icmp_ping(dst_address, sendif, send_params) 134 elif ping_type == 'tcpsyn': 135 send_tcp_syn(dst_address, sendif, send_params) 136 else: 137 raise Exception('Unspported ping type') 138 139 140def check_ipv4(expect_params, packet): 141 src_address = expect_params.get('src_address') 142 dst_address = expect_params.get('dst_address') 143 flags = expect_params.get('flags') 144 tos = expect_params.get('tc') 145 ttl = expect_params.get('hlim') 146 ip = packet.getlayer(sp.IP) 147 if not ip: 148 LOGGER.debug('Packet is not IPv4!') 149 return False 150 if src_address and ip.src != src_address: 151 LOGGER.debug('Source IPv4 address does not match!') 152 return False 153 if dst_address and ip.dst != dst_address: 154 LOGGER.debug('Destination IPv4 address does not match!') 155 return False 156 chksum = ip.chksum 157 ip.chksum = None 158 new_chksum = sp.IP(sp.raw(ip)).chksum 159 if chksum != new_chksum: 160 LOGGER.debug(f'Expected IP checksum {new_chksum} but found {chksum}') 161 return False 162 if flags and ip.flags != flags: 163 LOGGER.debug(f'Wrong IP flags value {ip.flags}, expected {flags}') 164 return False 165 if tos and ip.tos != tos: 166 LOGGER.debug(f'Wrong ToS value {ip.tos}, expected {tos}') 167 return False 168 if ttl and ip.ttl != ttl: 169 LOGGER.debug(f'Wrong TTL value {ip.ttl}, expected {ttl}') 170 return False 171 return True 172 173 174def check_ipv6(expect_params, packet): 175 src_address = expect_params.get('src_address') 176 dst_address = expect_params.get('dst_address') 177 flags = expect_params.get('flags') 178 hlim = expect_params.get('hlim') 179 tc = expect_params.get('tc') 180 ip6 = packet.getlayer(sp.IPv6) 181 if not ip6: 182 LOGGER.debug('Packet is not IPv6!') 183 return False 184 if src_address and ip6.src != src_address: 185 LOGGER.debug('Source IPv6 address does not match!') 186 return False 187 if dst_address and ip6.dst != dst_address: 188 LOGGER.debug('Destination IPv6 address does not match!') 189 return False 190 # IPv6 has no IP-level checksum. 191 if flags: 192 raise Exception("There's no fragmentation flags in IPv6") 193 if hlim and ip6.hlim != hlim: 194 LOGGER.debug(f'Wrong Hop Limit value {ip6.hlim}, expected {hlim}') 195 return False 196 if tc and ip6.tc != tc: 197 LOGGER.debug(f'Wrong TC value {ip6.tc}, expected {tc}') 198 return False 199 return True 200 201 202def check_ping_4(expect_params, packet): 203 expect_length = expect_params['length'] 204 if not check_ipv4(expect_params, packet): 205 return False 206 icmp = packet.getlayer(sp.ICMP) 207 if not icmp: 208 LOGGER.debug('Packet is not IPv4 ICMP!') 209 return False 210 raw = packet.getlayer(sp.Raw) 211 if not raw: 212 LOGGER.debug('Packet contains no payload!') 213 return False 214 if raw.load != build_payload(expect_length): 215 LOGGER.debug('Payload magic does not match!') 216 return False 217 return True 218 219 220def check_ping_request_4(expect_params, packet): 221 if not check_ping_4(expect_params, packet): 222 return False 223 icmp = packet.getlayer(sp.ICMP) 224 if sp.icmptypes[icmp.type] != 'echo-request': 225 LOGGER.debug('Packet is not IPv4 ICMP Echo Request!') 226 return False 227 return True 228 229 230def check_ping_reply_4(expect_params, packet): 231 if not check_ping_4(expect_params, packet): 232 return False 233 icmp = packet.getlayer(sp.ICMP) 234 if sp.icmptypes[icmp.type] != 'echo-reply': 235 LOGGER.debug('Packet is not IPv4 ICMP Echo Reply!') 236 return False 237 return True 238 239 240def check_ping_request_6(expect_params, packet): 241 expect_length = expect_params['length'] 242 if not check_ipv6(expect_params, packet): 243 return False 244 icmp = packet.getlayer(sp.ICMPv6EchoRequest) 245 if not icmp: 246 LOGGER.debug('Packet is not IPv6 ICMP Echo Request!') 247 return False 248 if icmp.data != build_payload(expect_length): 249 LOGGER.debug('Payload magic does not match!') 250 return False 251 return True 252 253 254def check_ping_reply_6(expect_params, packet): 255 expect_length = expect_params['length'] 256 if not check_ipv6(expect_params, packet): 257 return False 258 icmp = packet.getlayer(sp.ICMPv6EchoReply) 259 if not icmp: 260 LOGGER.debug('Packet is not IPv6 ICMP Echo Reply!') 261 return False 262 if icmp.data != build_payload(expect_length): 263 LOGGER.debug('Payload magic does not match!') 264 return False 265 return True 266 267 268def check_ping_request(expect_params, packet): 269 src_address = expect_params.get('src_address') 270 dst_address = expect_params.get('dst_address') 271 if not (src_address or dst_address): 272 raise Exception('Source or destination address must be given to match the ping request!') 273 if ( 274 (src_address and ':' in src_address) or 275 (dst_address and ':' in dst_address) 276 ): 277 return check_ping_request_6(expect_params, packet) 278 else: 279 return check_ping_request_4(expect_params, packet) 280 281 282def check_ping_reply(expect_params, packet): 283 src_address = expect_params.get('src_address') 284 dst_address = expect_params.get('dst_address') 285 if not (src_address or dst_address): 286 raise Exception('Source or destination address must be given to match the ping reply!') 287 if ( 288 (src_address and ':' in src_address) or 289 (dst_address and ':' in dst_address) 290 ): 291 return check_ping_reply_6(expect_params, packet) 292 else: 293 return check_ping_reply_4(expect_params, packet) 294 295 296def check_tcp(expect_params, packet): 297 tcp_flags = expect_params.get('tcp_flags') 298 mss = expect_params.get('mss') 299 seq = expect_params.get('seq') 300 tcp = packet.getlayer(sp.TCP) 301 if not tcp: 302 LOGGER.debug('Packet is not TCP!') 303 return False 304 chksum = tcp.chksum 305 tcp.chksum = None 306 newpacket = sp.Ether(sp.raw(packet[sp.Ether])) 307 new_chksum = newpacket[sp.TCP].chksum 308 if chksum != new_chksum: 309 LOGGER.debug(f'Wrong TCP checksum {chksum}, expected {new_chksum}!') 310 return False 311 if tcp_flags and tcp.flags != tcp_flags: 312 LOGGER.debug(f'Wrong TCP flags {tcp.flags}, expected {tcp_flags}!') 313 return False 314 if seq: 315 if tcp_flags == 'S': 316 tcp_seq = tcp.seq 317 elif tcp_flags == 'SA': 318 tcp_seq = tcp.ack - 1 319 if seq != tcp_seq: 320 LOGGER.debug(f'Wrong TCP Sequence Number {tcp_seq}, expected {seq}') 321 return False 322 if mss: 323 for option in tcp.options: 324 if option[0] == 'MSS': 325 if option[1] != mss: 326 LOGGER.debug(f'Wrong TCP MSS {option[1]}, expected {mss}') 327 return False 328 return True 329 330 331def check_tcp_syn_request_4(expect_params, packet): 332 if not check_ipv4(expect_params, packet): 333 return False 334 if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet): 335 return False 336 return True 337 338 339def check_tcp_syn_reply_4(expect_params, packet): 340 if not check_ipv4(expect_params, packet): 341 return False 342 if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet): 343 return False 344 return True 345 346 347def check_tcp_syn_request_6(expect_params, packet): 348 if not check_ipv6(expect_params, packet): 349 return False 350 if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet): 351 return False 352 return True 353 354 355def check_tcp_syn_reply_6(expect_params, packet): 356 if not check_ipv6(expect_params, packet): 357 return False 358 if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet): 359 return False 360 return True 361 362 363def check_tcp_syn_request(expect_params, packet): 364 src_address = expect_params.get('src_address') 365 dst_address = expect_params.get('dst_address') 366 if not (src_address or dst_address): 367 raise Exception('Source or destination address must be given to match the tcp syn request!') 368 if ( 369 (src_address and ':' in src_address) or 370 (dst_address and ':' in dst_address) 371 ): 372 return check_tcp_syn_request_6(expect_params, packet) 373 else: 374 return check_tcp_syn_request_4(expect_params, packet) 375 376 377def check_tcp_syn_reply(expect_params, packet): 378 src_address = expect_params.get('src_address') 379 dst_address = expect_params.get('dst_address') 380 if not (src_address or dst_address): 381 raise Exception('Source or destination address must be given to match the tcp syn reply!') 382 if ( 383 (src_address and ':' in src_address) or 384 (dst_address and ':' in dst_address) 385 ): 386 return check_tcp_syn_reply_6(expect_params, packet) 387 else: 388 return check_tcp_syn_reply_4(expect_params, packet) 389 390 391def setup_sniffer(recvif, ping_type, sniff_type, expect_params, defrag): 392 if ping_type == 'icmp' and sniff_type == 'request': 393 checkfn = check_ping_request 394 elif ping_type == 'icmp' and sniff_type == 'reply': 395 checkfn = check_ping_reply 396 elif ping_type == 'tcpsyn' and sniff_type == 'request': 397 checkfn = check_tcp_syn_request 398 elif ping_type == 'tcpsyn' and sniff_type == 'reply': 399 checkfn = check_tcp_syn_reply 400 else: 401 raise Exception('Unspported ping or sniff type') 402 403 return Sniffer(expect_params, checkfn, recvif, defrag=defrag) 404 405 406def parse_args(): 407 parser = argparse.ArgumentParser("pft_ping.py", 408 description="Ping test tool") 409 410 # Parameters of sent ping request 411 parser.add_argument('--sendif', nargs=1, 412 required=True, 413 help='The interface through which the packet(s) will be sent') 414 parser.add_argument('--to', nargs=1, 415 required=True, 416 help='The destination IP address for the ping request') 417 parser.add_argument('--ping-type', 418 choices=('icmp', 'tcpsyn'), 419 help='Type of ping: ICMP (default) or TCP SYN', 420 default='icmp') 421 parser.add_argument('--fromaddr', nargs=1, 422 help='The source IP address for the ping request') 423 424 # Where to look for packets to analyze. 425 # The '+' format is ugly as it mixes positional with optional syntax. 426 # But we have no positional parameters so I guess it's fine to use it. 427 parser.add_argument('--recvif', nargs='+', 428 help='The interfaces on which to expect the ping request') 429 parser.add_argument('--replyif', nargs='+', 430 help='The interfaces which to expect the ping response') 431 432 # Packet settings 433 parser_send = parser.add_argument_group('Values set in transmitted packets') 434 parser_send.add_argument('--send-flags', nargs=1, type=str, 435 help='IPv4 fragmentation flags') 436 parser_send.add_argument('--send-frag-length', nargs=1, type=int, 437 help='Force IP fragmentation with given fragment length') 438 parser_send.add_argument('--send-hlim', nargs=1, type=int, 439 help='IPv6 Hop Limit or IPv4 Time To Live') 440 parser_send.add_argument('--send-mss', nargs=1, type=int, 441 help='TCP Maximum Segment Size') 442 parser_send.add_argument('--send-seq', nargs=1, type=int, 443 help='TCP sequence number') 444 parser_send.add_argument('--send-length', nargs=1, type=int, 445 default=[len(PAYLOAD_MAGIC)], help='ICMP Echo Request payload size') 446 parser_send.add_argument('--send-tc', nargs=1, type=int, 447 help='IPv6 Traffic Class or IPv4 DiffServ / ToS') 448 parser_send.add_argument('--send-tcpopt-unaligned', action='store_true', 449 help='Include unaligned TCP options') 450 parser_send.add_argument('--send-nop', action='store_true', 451 help='Include a NOP IPv4 option') 452 453 # Expectations 454 parser_expect = parser.add_argument_group('Values expected in sniffed packets') 455 parser_expect.add_argument('--expect-flags', nargs=1, type=str, 456 help='IPv4 fragmentation flags') 457 parser_expect.add_argument('--expect-hlim', nargs=1, type=int, 458 help='IPv6 Hop Limit or IPv4 Time To Live') 459 parser_expect.add_argument('--expect-mss', nargs=1, type=int, 460 help='TCP Maximum Segment Size') 461 parser_send.add_argument('--expect-seq', nargs=1, type=int, 462 help='TCP sequence number') 463 parser_expect.add_argument('--expect-tc', nargs=1, type=int, 464 help='IPv6 Traffic Class or IPv4 DiffServ / ToS') 465 466 parser.add_argument('-v', '--verbose', action='store_true', 467 help=('Enable verbose logging. Apart of potentially useful information ' 468 'you might see warnings from parsing packets like NDP or other ' 469 'packets not related to the test being run. Use only when ' 470 'developing because real tests expect empty stderr and stdout.')) 471 472 return parser.parse_args() 473 474 475def main(): 476 args = parse_args() 477 478 if args.verbose: 479 LOGGER.setLevel(logging.DEBUG) 480 481 # Dig out real values of program arguments 482 send_if = args.sendif[0] 483 reply_ifs = args.replyif 484 recv_ifs = args.recvif 485 dst_address = args.to[0] 486 487 # Standardize parameters which have nargs=1. 488 send_params = {} 489 expect_params = {} 490 for param_name in ('flags', 'hlim', 'length', 'mss', 'seq', 'tc', 'frag_length'): 491 param_arg = vars(args).get(f'send_{param_name}') 492 send_params[param_name] = param_arg[0] if param_arg else None 493 param_arg = vars(args).get(f'expect_{param_name}') 494 expect_params[param_name] = param_arg[0] if param_arg else None 495 496 expect_params['length'] = send_params['length'] 497 send_params['tcpopt_unaligned'] = args.send_tcpopt_unaligned 498 send_params['nop'] = args.send_nop 499 send_params['src_address'] = args.fromaddr[0] if args.fromaddr else None 500 501 # We may not have a default route. Tell scapy where to start looking for routes 502 sp.conf.iface6 = send_if 503 504 # Configuration sanity checking. 505 if not (reply_ifs or recv_ifs): 506 raise Exception('With no reply or recv interface specified no traffic ' 507 'can be sniffed and verified!' 508 ) 509 510 sniffers = [] 511 512 if send_params['frag_length']: 513 defrag = True 514 else: 515 defrag = False 516 517 if recv_ifs: 518 sniffer_params = copy(expect_params) 519 sniffer_params['src_address'] = None 520 sniffer_params['dst_address'] = dst_address 521 for iface in recv_ifs: 522 LOGGER.debug(f'Installing receive sniffer on {iface}') 523 sniffers.append( 524 setup_sniffer(iface, args.ping_type, 'request', 525 sniffer_params, defrag, 526 )) 527 528 if reply_ifs: 529 sniffer_params = copy(expect_params) 530 sniffer_params['src_address'] = dst_address 531 sniffer_params['dst_address'] = None 532 for iface in reply_ifs: 533 LOGGER.debug(f'Installing reply sniffer on {iface}') 534 sniffers.append( 535 setup_sniffer(iface, args.ping_type, 'reply', 536 sniffer_params, defrag, 537 )) 538 539 LOGGER.debug(f'Installed {len(sniffers)} sniffers') 540 541 send_ping(dst_address, send_if, args.ping_type, send_params) 542 543 err = 0 544 sniffer_num = 0 545 for sniffer in sniffers: 546 sniffer.join() 547 if sniffer.correctPackets == 1: 548 LOGGER.debug(f'Expected ping has been sniffed on {sniffer._recvif}.') 549 else: 550 # Set a bit in err for each failed sniffer. 551 err |= 1<<sniffer_num 552 if sniffer.correctPackets > 1: 553 LOGGER.debug(f'Duplicated ping has been sniffed on {sniffer._recvif}!') 554 else: 555 LOGGER.debug(f'Expected ping has not been sniffed on {sniffer._recvif}!') 556 sniffer_num += 1 557 558 return err 559 560 561if __name__ == '__main__': 562 sys.exit(main()) 563