1#!/usr/bin/env python3 2# 3# SPDX-License-Identifier: BSD-2-Clause 4# 5# Copyright (c) 2017 Kristof Provost <kp@FreeBSD.org> 6# Copyright (c) 2023 Kajetan Staszkiewicz <vegeta@tuxpowered.net> 7# 8# Redistribution and use in source and binary forms, with or without 9# modification, are permitted provided that the following conditions 10# are met: 11# 1. Redistributions of source code must retain the above copyright 12# notice, this list of conditions and the following disclaimer. 13# 2. Redistributions in binary form must reproduce the above copyright 14# notice, this list of conditions and the following disclaimer in the 15# documentation and/or other materials provided with the distribution. 16# 17# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 18# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 21# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27# SUCH DAMAGE. 28# 29 30import argparse 31import logging 32logging.getLogger("scapy").setLevel(logging.CRITICAL) 33import math 34import scapy.all as sp 35import sys 36 37from copy import copy 38from sniffer import Sniffer 39 40logging.basicConfig(format='%(message)s') 41LOGGER = logging.getLogger(__name__) 42 43PAYLOAD_MAGIC = bytes.fromhex('42c0ffee') 44 45def build_payload(l): 46 pl = len(PAYLOAD_MAGIC) 47 ret = PAYLOAD_MAGIC * math.floor(l/pl) 48 ret += PAYLOAD_MAGIC[0:(l % pl)] 49 return ret 50 51 52def clean_params(params): 53 # Prepare a copy of safe copy of params 54 ret = copy(params) 55 ret.pop('src_address') 56 ret.pop('dst_address') 57 ret.pop('flags') 58 return ret 59 60def prepare_ipv6(send_params): 61 src_address = send_params.get('src_address') 62 dst_address = send_params.get('dst_address') 63 hlim = send_params.get('hlim') 64 tc = send_params.get('tc') 65 ip6 = sp.IPv6(dst=dst_address) 66 if src_address: 67 ip6.src = src_address 68 if hlim: 69 ip6.hlim = hlim 70 if tc: 71 ip6.tc = tc 72 return ip6 73 74 75def prepare_ipv4(send_params): 76 src_address = send_params.get('src_address') 77 dst_address = send_params.get('dst_address') 78 flags = send_params.get('flags') 79 tos = send_params.get('tc') 80 ttl = send_params.get('hlim') 81 opt = send_params.get('nop') 82 options = '' 83 if opt: 84 options='\x00' 85 ip = sp.IP(dst=dst_address, options=options) 86 if src_address: 87 ip.src = src_address 88 if flags: 89 ip.flags = flags 90 if tos: 91 ip.tos = tos 92 if ttl: 93 ip.ttl = ttl 94 return ip 95 96 97def send_icmp_ping(send_params): 98 send_length = send_params['length'] 99 send_frag_length = send_params['frag_length'] 100 packets = [] 101 ether = sp.Ether() 102 if ':' in send_params['dst_address']: 103 ip6 = prepare_ipv6(send_params) 104 icmp = sp.ICMPv6EchoRequest(data=sp.raw(build_payload(send_length))) 105 if send_frag_length: 106 for packet in sp.fragment6(ip6 / icmp, fragSize=send_frag_length): 107 packets.append(ether / packet) 108 else: 109 packets.append(ether / ip6 / icmp) 110 111 else: 112 ip = prepare_ipv4(send_params) 113 icmp = sp.ICMP(type='echo-request') 114 raw = sp.raw(build_payload(send_length)) 115 if send_frag_length: 116 for packet in sp.fragment(ip / icmp / raw, fragsize=send_frag_length): 117 packets.append(ether / packet) 118 else: 119 packets.append(ether / ip / icmp / raw) 120 for packet in packets: 121 sp.sendp(packet, iface=send_params['sendif'], verbose=False) 122 123 124def send_tcp_syn(send_params): 125 tcpopt_unaligned = send_params.get('tcpopt_unaligned') 126 seq = send_params.get('seq') 127 mss = send_params.get('mss') 128 ether = sp.Ether() 129 opts=[('Timestamp', (1, 1)), ('MSS', mss if mss else 1280)] 130 if tcpopt_unaligned: 131 opts = [('NOP', 0 )] + opts 132 if ':' in send_params['dst_address']: 133 ip = prepare_ipv6(send_params) 134 else: 135 ip = prepare_ipv4(send_params) 136 tcp = sp.TCP( 137 sport=send_params.get('sport'), dport=send_params.get('dport'), 138 flags='S', options=opts, seq=seq, 139 ) 140 req = ether / ip / tcp 141 sp.sendp(req, iface=send_params['sendif'], verbose=False) 142 143 144def send_udp(send_params): 145 LOGGER.debug(f'Sending UDP ping') 146 packets = [] 147 send_length = send_params['length'] 148 send_frag_length = send_params['frag_length'] 149 ether = sp.Ether() 150 if ':' in send_params['dst_address']: 151 ip6 = prepare_ipv6(send_params) 152 udp = sp.UDP( 153 sport=send_params.get('sport'), dport=send_params.get('dport'), 154 ) 155 raw = sp.Raw(load=build_payload(send_length)) 156 if send_frag_length: 157 for packet in sp.fragment6(ip6 / udp / raw, fragSize=send_frag_length): 158 packets.append(ether / packet) 159 else: 160 packets.append(ether / ip6 / udp / raw) 161 else: 162 ip = prepare_ipv4(send_params) 163 udp = sp.UDP( 164 sport=send_params.get('sport'), dport=send_params.get('dport'), 165 ) 166 raw = sp.Raw(load=build_payload(send_length)) 167 if send_frag_length: 168 for packet in sp.fragment(ip / udp / raw, fragsize=send_frag_length): 169 packets.append(ether / packet) 170 else: 171 packets.append(ether / ip / udp / raw) 172 173 for packet in packets: 174 sp.sendp(packet, iface=send_params['sendif'], verbose=False) 175 176 177def send_ping(ping_type, send_params): 178 if ping_type == 'icmp': 179 send_icmp_ping(send_params) 180 elif ( 181 ping_type == 'tcpsyn' or 182 ping_type == 'tcp3way' 183 ): 184 send_tcp_syn(send_params) 185 elif ping_type == 'udp': 186 send_udp(send_params) 187 else: 188 raise Exception('Unsupported ping type') 189 190 191def check_ipv4(expect_params, packet): 192 src_address = expect_params.get('src_address') 193 dst_address = expect_params.get('dst_address') 194 flags = expect_params.get('flags') 195 tos = expect_params.get('tc') 196 ttl = expect_params.get('hlim') 197 ip = packet.getlayer(sp.IP) 198 LOGGER.debug(f'Packet: {ip}') 199 if not ip: 200 LOGGER.debug('Packet is not IPv4!') 201 return False 202 if src_address and ip.src != src_address: 203 LOGGER.debug(f'Wrong IPv4 source {ip.src}, expected {src_address}') 204 return False 205 if dst_address and ip.dst != dst_address: 206 LOGGER.debug(f'Wrong IPv4 destination {ip.dst}, expected {dst_address}') 207 return False 208 chksum = ip.chksum 209 ip.chksum = None 210 new_chksum = sp.IP(sp.raw(ip)).chksum 211 if chksum != new_chksum: 212 LOGGER.debug(f'Wrong IPv4 checksum {chksum}, expected {new_chksum}') 213 return False 214 if flags and ip.flags != flags: 215 LOGGER.debug(f'Wrong IP flags value {ip.flags}, expected {flags}') 216 return False 217 if tos and ip.tos != tos: 218 LOGGER.debug(f'Wrong ToS value {ip.tos}, expected {tos}') 219 return False 220 if ttl and ip.ttl != ttl: 221 LOGGER.debug(f'Wrong TTL value {ip.ttl}, expected {ttl}') 222 return False 223 return True 224 225 226def check_ipv6(expect_params, packet): 227 src_address = expect_params.get('src_address') 228 dst_address = expect_params.get('dst_address') 229 flags = expect_params.get('flags') 230 hlim = expect_params.get('hlim') 231 tc = expect_params.get('tc') 232 ip6 = packet.getlayer(sp.IPv6) 233 if not ip6: 234 LOGGER.debug('Packet is not IPv6!') 235 return False 236 if src_address and ip6.src != src_address: 237 LOGGER.debug(f'Wrong IPv6 source {ip6.src}, expected {src_address}') 238 return False 239 if dst_address and ip6.dst != dst_address: 240 LOGGER.debug(f'Wrong IPv6 destination {ip6.dst}, expected {dst_address}') 241 return False 242 # IPv6 has no IP-level checksum. 243 if flags: 244 raise Exception("There's no fragmentation flags in IPv6") 245 if hlim and ip6.hlim != hlim: 246 LOGGER.debug(f'Wrong Hop Limit value {ip6.hlim}, expected {hlim}') 247 return False 248 if tc and ip6.tc != tc: 249 LOGGER.debug(f'Wrong TC value {ip6.tc}, expected {tc}') 250 return False 251 return True 252 253 254def check_ping_4(expect_params, packet): 255 expect_length = expect_params['length'] 256 if not check_ipv4(expect_params, packet): 257 return False 258 icmp = packet.getlayer(sp.ICMP) 259 if not icmp: 260 LOGGER.debug('Packet is not IPv4 ICMP!') 261 return False 262 raw = packet.getlayer(sp.Raw) 263 if not raw: 264 LOGGER.debug('Packet contains no payload!') 265 return False 266 if raw.load != build_payload(expect_length): 267 LOGGER.debug('Payload magic does not match!') 268 return False 269 return True 270 271 272def check_ping_request_4(expect_params, packet): 273 if not check_ping_4(expect_params, packet): 274 return False 275 icmp = packet.getlayer(sp.ICMP) 276 if sp.icmptypes[icmp.type] != 'echo-request': 277 LOGGER.debug('Packet is not IPv4 ICMP Echo Request!') 278 return False 279 return True 280 281 282def check_ping_reply_4(expect_params, packet): 283 if not check_ping_4(expect_params, packet): 284 return False 285 icmp = packet.getlayer(sp.ICMP) 286 if sp.icmptypes[icmp.type] != 'echo-reply': 287 LOGGER.debug('Packet is not IPv4 ICMP Echo Reply!') 288 return False 289 return True 290 291 292def check_ping_request_6(expect_params, packet): 293 expect_length = expect_params['length'] 294 if not check_ipv6(expect_params, packet): 295 return False 296 icmp = packet.getlayer(sp.ICMPv6EchoRequest) 297 if not icmp: 298 LOGGER.debug('Packet is not IPv6 ICMP Echo Request!') 299 return False 300 if icmp.data != build_payload(expect_length): 301 LOGGER.debug('Payload magic does not match!') 302 return False 303 return True 304 305 306def check_ping_reply_6(expect_params, packet): 307 expect_length = expect_params['length'] 308 if not check_ipv6(expect_params, packet): 309 return False 310 icmp = packet.getlayer(sp.ICMPv6EchoReply) 311 if not icmp: 312 LOGGER.debug('Packet is not IPv6 ICMP Echo Reply!') 313 return False 314 if icmp.data != build_payload(expect_length): 315 LOGGER.debug('Payload magic does not match!') 316 return False 317 return True 318 319 320def check_ping_request(args, packet): 321 src_address = args['expect_params'].get('src_address') 322 dst_address = args['expect_params'].get('dst_address') 323 if not (src_address or dst_address): 324 raise Exception('Source or destination address must be given to match the ping request!') 325 if ( 326 (src_address and ':' in src_address) or 327 (dst_address and ':' in dst_address) 328 ): 329 return check_ping_request_6(args['expect_params'], packet) 330 else: 331 return check_ping_request_4(args['expect_params'], packet) 332 333 334def check_ping_reply(args, packet): 335 src_address = args['expect_params'].get('src_address') 336 dst_address = args['expect_params'].get('dst_address') 337 if not (src_address or dst_address): 338 raise Exception('Source or destination address must be given to match the ping reply!') 339 if ( 340 (src_address and ':' in src_address) or 341 (dst_address and ':' in dst_address) 342 ): 343 return check_ping_reply_6(args['expect_params'], packet) 344 else: 345 return check_ping_reply_4(args['expect_params'], packet) 346 347 348def check_tcp(expect_params, packet): 349 tcp_flags = expect_params.get('tcp_flags') 350 mss = expect_params.get('mss') 351 seq = expect_params.get('seq') 352 tcp = packet.getlayer(sp.TCP) 353 if not tcp: 354 LOGGER.debug('Packet is not TCP!') 355 return False 356 chksum = tcp.chksum 357 tcp.chksum = None 358 newpacket = sp.Ether(sp.raw(packet[sp.Ether])) 359 new_chksum = newpacket[sp.TCP].chksum 360 if new_chksum and chksum != new_chksum: 361 LOGGER.debug(f'Wrong TCP checksum {chksum}, expected {new_chksum}!') 362 return False 363 if tcp_flags and tcp.flags != tcp_flags: 364 LOGGER.debug(f'Wrong TCP flags {tcp.flags}, expected {tcp_flags}!') 365 return False 366 if seq: 367 if tcp_flags == 'S': 368 tcp_seq = tcp.seq 369 elif tcp_flags == 'SA': 370 tcp_seq = tcp.ack - 1 371 if seq != tcp_seq: 372 LOGGER.debug(f'Wrong TCP Sequence Number {tcp_seq}, expected {seq}') 373 return False 374 if mss: 375 for option in tcp.options: 376 if option[0] == 'MSS': 377 if option[1] != mss: 378 LOGGER.debug(f'Wrong TCP MSS {option[1]}, expected {mss}') 379 return False 380 return True 381 382 383def check_udp(expect_params, packet): 384 expect_length = expect_params['length'] 385 udp = packet.getlayer(sp.UDP) 386 if not udp: 387 LOGGER.debug('Packet is not UDP!') 388 return False 389 raw = packet.getlayer(sp.Raw) 390 if not raw: 391 LOGGER.debug('Packet contains no payload!') 392 return False 393 if raw.load != build_payload(expect_length): 394 LOGGER.debug(f'Payload magic does not match len {len(raw.load)} vs {expect_length}!') 395 return False 396 orig_chksum = udp.chksum 397 udp.chksum = None 398 newpacket = sp.Ether(sp.raw(packet[sp.Ether])) 399 new_chksum = newpacket[sp.UDP].chksum 400 if new_chksum and orig_chksum != new_chksum: 401 LOGGER.debug(f'Wrong UDP checksum {orig_chksum}, expected {new_chksum}!') 402 return False 403 404 return True 405 406 407def check_tcp_syn_request_4(expect_params, packet): 408 if not check_ipv4(expect_params, packet): 409 return False 410 if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet): 411 return False 412 return True 413 414 415def check_tcp_syn_reply_4(send_params, expect_params, packet): 416 if not check_ipv4(expect_params, packet): 417 return False 418 if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet): 419 return False 420 return True 421 422 423def check_tcp_3way_4(args, packet): 424 send_params = args['send_params'] 425 426 expect_params_sa = clean_params(args['expect_params']) 427 expect_params_sa['src_address'] = send_params['dst_address'] 428 expect_params_sa['dst_address'] = send_params['src_address'] 429 430 # Sniff incoming SYN+ACK packet 431 if ( 432 check_ipv4(expect_params_sa, packet) and 433 check_tcp(expect_params_sa | {'tcp_flags': 'SA'}, packet) 434 ): 435 ether = sp.Ether() 436 ip_sa = packet.getlayer(sp.IP) 437 tcp_sa = packet.getlayer(sp.TCP) 438 reply_params = clean_params(send_params) 439 reply_params['src_address'] = ip_sa.dst 440 reply_params['dst_address'] = ip_sa.src 441 ip_a = prepare_ipv4(reply_params) 442 tcp_a = sp.TCP( 443 sport=tcp_sa.dport, dport=tcp_sa.sport, flags='A', 444 seq=tcp_sa.ack, ack=tcp_sa.seq + 1, 445 ) 446 req = ether / ip_a / tcp_a 447 sp.sendp(req, iface=send_params['sendif'], verbose=False) 448 return True 449 450 return False 451 452 453def check_udp_request_4(expect_params, packet): 454 if not check_ipv4(expect_params, packet): 455 return False 456 if not check_udp(expect_params, packet): 457 return False 458 return True 459 460 461def check_tcp_syn_request_6(expect_params, packet): 462 if not check_ipv6(expect_params, packet): 463 return False 464 if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet): 465 return False 466 return True 467 468 469def check_tcp_syn_reply_6(expect_params, packet): 470 if not check_ipv6(expect_params, packet): 471 return False 472 if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet): 473 return False 474 return True 475 476 477def check_tcp_3way_6(args, packet): 478 send_params = args['send_params'] 479 480 expect_params_sa = clean_params(args['expect_params']) 481 expect_params_sa['src_address'] = send_params['dst_address'] 482 expect_params_sa['dst_address'] = send_params['src_address'] 483 484 # Sniff incoming SYN+ACK packet 485 if ( 486 check_ipv6(expect_params_sa, packet) and 487 check_tcp(expect_params_sa | {'tcp_flags': 'SA'}, packet) 488 ): 489 ether = sp.Ether() 490 ip6_sa = packet.getlayer(sp.IPv6) 491 tcp_sa = packet.getlayer(sp.TCP) 492 reply_params = clean_params(send_params) 493 reply_params['src_address'] = ip6_sa.dst 494 reply_params['dst_address'] = ip6_sa.src 495 ip_a = prepare_ipv6(reply_params) 496 tcp_a = sp.TCP( 497 sport=tcp_sa.dport, dport=tcp_sa.sport, flags='A', 498 seq=tcp_sa.ack, ack=tcp_sa.seq + 1, 499 ) 500 req = ether / ip_a / tcp_a 501 sp.sendp(req, iface=send_params['sendif'], verbose=False) 502 return True 503 504 return False 505 506 507def check_udp_request_6(expect_params, packet): 508 if not check_ipv6(expect_params, packet): 509 return False 510 if not check_udp(expect_params, packet): 511 return False 512 return True 513 514def check_tcp_syn_request(args, packet): 515 expect_params = args['expect_params'] 516 src_address = expect_params.get('src_address') 517 dst_address = expect_params.get('dst_address') 518 if not (src_address or dst_address): 519 raise Exception('Source or destination address must be given to match the tcp syn request!') 520 if ( 521 (src_address and ':' in src_address) or 522 (dst_address and ':' in dst_address) 523 ): 524 return check_tcp_syn_request_6(expect_params, packet) 525 else: 526 return check_tcp_syn_request_4(expect_params, packet) 527 528 529def check_tcp_syn_reply(args, packet): 530 expect_params = args['expect_params'] 531 src_address = expect_params.get('src_address') 532 dst_address = expect_params.get('dst_address') 533 if not (src_address or dst_address): 534 raise Exception('Source or destination address must be given to match the tcp syn reply!') 535 if ( 536 (src_address and ':' in src_address) or 537 (dst_address and ':' in dst_address) 538 ): 539 return check_tcp_syn_reply_6(expect_params, packet) 540 else: 541 return check_tcp_syn_reply_4(expect_params, packet) 542 543def check_tcp_3way(args, packet): 544 expect_params = args['expect_params'] 545 src_address = expect_params.get('src_address') 546 dst_address = expect_params.get('dst_address') 547 if not (src_address or dst_address): 548 raise Exception('Source or destination address must be given to match the tcp syn reply!') 549 if ( 550 (src_address and ':' in src_address) or 551 (dst_address and ':' in dst_address) 552 ): 553 return check_tcp_3way_6(args, packet) 554 else: 555 return check_tcp_3way_4(args, packet) 556 557 558def check_udp_request(args, packet): 559 expect_params = args['expect_params'] 560 src_address = expect_params.get('src_address') 561 dst_address = expect_params.get('dst_address') 562 if not (src_address or dst_address): 563 raise Exception('Source or destination address must be given to match the tcp syn request!') 564 if ( 565 (src_address and ':' in src_address) or 566 (dst_address and ':' in dst_address) 567 ): 568 return check_udp_request_6(expect_params, packet) 569 else: 570 return check_udp_request_4(expect_params, packet) 571 572 573def setup_sniffer( 574 recvif, ping_type, sniff_type, expect_params, defrag, send_params, 575): 576 if ping_type == 'icmp' and sniff_type == 'request': 577 checkfn = check_ping_request 578 elif ping_type == 'icmp' and sniff_type == 'reply': 579 checkfn = check_ping_reply 580 elif ping_type == 'tcpsyn' and sniff_type == 'request': 581 checkfn = check_tcp_syn_request 582 elif ping_type == 'tcpsyn' and sniff_type == 'reply': 583 checkfn = check_tcp_syn_reply 584 elif ping_type == 'tcp3way' and sniff_type == 'reply': 585 checkfn = check_tcp_3way 586 elif ping_type == 'udp' and sniff_type == 'request': 587 checkfn = check_udp_request 588 else: 589 raise Exception('Unspported ping and sniff type combination') 590 591 return Sniffer( 592 {'send_params': send_params, 'expect_params': expect_params}, 593 checkfn, recvif, defrag=defrag, 594 ) 595 596 597def parse_args(): 598 parser = argparse.ArgumentParser("pft_ping.py", 599 description="Ping test tool") 600 601 # Parameters of sent ping request 602 parser.add_argument('--sendif', required=True, 603 help='The interface through which the packet(s) will be sent') 604 parser.add_argument('--to', required=True, 605 help='The destination IP address for the ping request') 606 parser.add_argument('--ping-type', 607 choices=('icmp', 'tcpsyn', 'tcp3way', 'udp'), 608 help='Type of ping: ICMP (default) or TCP SYN or 3-way TCP handshake', 609 default='icmp') 610 parser.add_argument('--fromaddr', 611 help='The source IP address for the ping request') 612 613 # Where to look for packets to analyze. 614 # The '+' format is ugly as it mixes positional with optional syntax. 615 # But we have no positional parameters so I guess it's fine to use it. 616 parser.add_argument('--recvif', nargs='+', 617 help='The interfaces on which to expect the ping request') 618 parser.add_argument('--replyif', nargs='+', 619 help='The interfaces which to expect the ping response') 620 621 # Packet settings 622 parser_send = parser.add_argument_group('Values set in transmitted packets') 623 parser_send.add_argument('--send-flags', type=str, 624 help='IPv4 fragmentation flags') 625 parser_send.add_argument('--send-frag-length', type=int, 626 help='Force IP fragmentation with given fragment length') 627 parser_send.add_argument('--send-hlim', type=int, 628 help='IPv6 Hop Limit or IPv4 Time To Live') 629 parser_send.add_argument('--send-mss', type=int, 630 help='TCP Maximum Segment Size') 631 parser_send.add_argument('--send-seq', type=int, 632 help='TCP sequence number') 633 parser_send.add_argument('--send-sport', type=int, 634 help='TCP source port') 635 parser_send.add_argument('--send-dport', type=int, default=9, 636 help='TCP destination port') 637 parser_send.add_argument('--send-length', type=int, default=len(PAYLOAD_MAGIC), 638 help='ICMP Echo Request payload size') 639 parser_send.add_argument('--send-tc', type=int, 640 help='IPv6 Traffic Class or IPv4 DiffServ / ToS') 641 parser_send.add_argument('--send-tcpopt-unaligned', action='store_true', 642 help='Include unaligned TCP options') 643 parser_send.add_argument('--send-nop', action='store_true', 644 help='Include a NOP IPv4 option') 645 646 # Expectations 647 parser_expect = parser.add_argument_group('Values expected in sniffed packets') 648 parser_expect.add_argument('--expect-flags', type=str, 649 help='IPv4 fragmentation flags') 650 parser_expect.add_argument('--expect-hlim', type=int, 651 help='IPv6 Hop Limit or IPv4 Time To Live') 652 parser_expect.add_argument('--expect-mss', type=int, 653 help='TCP Maximum Segment Size') 654 parser_send.add_argument('--expect-seq', type=int, 655 help='TCP sequence number') 656 parser_expect.add_argument('--expect-tc', type=int, 657 help='IPv6 Traffic Class or IPv4 DiffServ / ToS') 658 659 parser.add_argument('-v', '--verbose', action='store_true', 660 help=('Enable verbose logging. Apart of potentially useful information ' 661 'you might see warnings from parsing packets like NDP or other ' 662 'packets not related to the test being run. Use only when ' 663 'developing because real tests expect empty stderr and stdout.')) 664 665 return parser.parse_args() 666 667 668def main(): 669 args = parse_args() 670 671 if args.verbose: 672 LOGGER.setLevel(logging.DEBUG) 673 674 # Split parameters into send and expect parameters. Parameters might be 675 # missing from the command line, always fill the dictionaries with None. 676 send_params = {} 677 expect_params = {} 678 for param_name in ( 679 'flags', 'hlim', 'length', 'mss', 'seq', 'tc', 'frag_length', 680 'sport', 'dport', 681 ): 682 param_arg = vars(args).get(f'send_{param_name}') 683 send_params[param_name] = param_arg if param_arg else None 684 param_arg = vars(args).get(f'expect_{param_name}') 685 expect_params[param_name] = param_arg if param_arg else None 686 687 expect_params['length'] = send_params['length'] 688 send_params['tcpopt_unaligned'] = args.send_tcpopt_unaligned 689 send_params['nop'] = args.send_nop 690 send_params['src_address'] = args.fromaddr if args.fromaddr else None 691 send_params['dst_address'] = args.to 692 send_params['sendif'] = args.sendif 693 694 # We may not have a default route. Tell scapy where to start looking for routes 695 sp.conf.iface6 = args.sendif 696 697 # Configuration sanity checking. 698 if not (args.replyif or args.recvif): 699 raise Exception('With no reply or recv interface specified no traffic ' 700 'can be sniffed and verified!' 701 ) 702 703 sniffers = [] 704 705 if send_params['frag_length']: 706 if ( 707 (send_params['src_address'] and ':' in send_params['src_address']) or 708 (send_params['dst_address'] and ':' in send_params['dst_address']) 709 ): 710 defrag = 'IPv6' 711 else: 712 defrag = 'IPv4' 713 else: 714 defrag = False 715 716 if args.recvif: 717 sniffer_params = copy(expect_params) 718 sniffer_params['src_address'] = None 719 sniffer_params['dst_address'] = args.to 720 for iface in args.recvif: 721 LOGGER.debug(f'Installing receive sniffer on {iface}') 722 sniffers.append( 723 setup_sniffer(iface, args.ping_type, 'request', 724 sniffer_params, defrag, send_params, 725 )) 726 727 if args.replyif: 728 sniffer_params = copy(expect_params) 729 sniffer_params['src_address'] = args.to 730 sniffer_params['dst_address'] = None 731 for iface in args.replyif: 732 LOGGER.debug(f'Installing reply sniffer on {iface}') 733 sniffers.append( 734 setup_sniffer(iface, args.ping_type, 'reply', 735 sniffer_params, defrag, send_params, 736 )) 737 738 LOGGER.debug(f'Installed {len(sniffers)} sniffers') 739 740 send_ping(args.ping_type, send_params) 741 742 err = 0 743 sniffer_num = 0 744 for sniffer in sniffers: 745 sniffer.join() 746 if sniffer.correctPackets == 1: 747 LOGGER.debug(f'Expected ping has been sniffed on {sniffer._recvif}.') 748 else: 749 # Set a bit in err for each failed sniffer. 750 err |= 1<<sniffer_num 751 if sniffer.correctPackets > 1: 752 LOGGER.debug(f'Duplicated ping has been sniffed on {sniffer._recvif}!') 753 else: 754 LOGGER.debug(f'Expected ping has not been sniffed on {sniffer._recvif}!') 755 sniffer_num += 1 756 757 return err 758 759 760if __name__ == '__main__': 761 sys.exit(main()) 762