1#!/usr/bin/env python3 2# 3# SPDX-License-Identifier: BSD-2-Clause 4# 5# Copyright (c) 2017 Kristof Provost <kp@FreeBSD.org> 6# Copyright (c) 2023 Kajetan Staszkiewicz <vegeta@tuxpowered.net> 7# 8# Redistribution and use in source and binary forms, with or without 9# modification, are permitted provided that the following conditions 10# are met: 11# 1. Redistributions of source code must retain the above copyright 12# notice, this list of conditions and the following disclaimer. 13# 2. Redistributions in binary form must reproduce the above copyright 14# notice, this list of conditions and the following disclaimer in the 15# documentation and/or other materials provided with the distribution. 16# 17# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 18# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 21# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27# SUCH DAMAGE. 28# 29 30import argparse 31import logging 32logging.getLogger("scapy").setLevel(logging.CRITICAL) 33import math 34import scapy.all as sp 35import sys 36import socket 37 38from copy import copy 39from sniffer import Sniffer 40 41logging.basicConfig(format='%(message)s') 42LOGGER = logging.getLogger(__name__) 43 44PAYLOAD_MAGIC = bytes.fromhex('42c0ffee') 45 46def build_payload(l): 47 pl = len(PAYLOAD_MAGIC) 48 ret = PAYLOAD_MAGIC * math.floor(l/pl) 49 ret += PAYLOAD_MAGIC[0:(l % pl)] 50 return ret 51 52 53def clean_params(params): 54 # Prepare a copy of safe copy of params 55 ret = copy(params) 56 ret.pop('src_address') 57 ret.pop('dst_address') 58 ret.pop('flags') 59 return ret 60 61def prepare_ipv6(send_params): 62 src_address = send_params.get('src_address') 63 dst_address = send_params.get('dst_address') 64 hlim = send_params.get('hlim') 65 tc = send_params.get('tc') 66 fl = send_params.get('fl') 67 ip6 = sp.IPv6(dst=dst_address) 68 if src_address: 69 ip6.src = src_address 70 if hlim: 71 ip6.hlim = hlim 72 if tc: 73 ip6.tc = tc 74 if fl: 75 ip6.fl = fl 76 return ip6 77 78 79def prepare_ipv4(send_params): 80 src_address = send_params.get('src_address') 81 dst_address = send_params.get('dst_address') 82 flags = send_params.get('flags') 83 tos = send_params.get('tc') 84 ttl = send_params.get('hlim') 85 opt = send_params.get('nop') 86 options = '' 87 if opt: 88 options='\x00' 89 ip = sp.IP(dst=dst_address, options=options) 90 if src_address: 91 ip.src = src_address 92 if flags: 93 ip.flags = flags 94 if tos: 95 ip.tos = tos 96 if ttl: 97 ip.ttl = ttl 98 return ip 99 100 101def send_icmp_ping(send_params): 102 send_length = send_params['length'] 103 send_frag_length = send_params['frag_length'] 104 packets = [] 105 ether = sp.Ether() 106 if ':' in send_params['dst_address']: 107 ip6 = prepare_ipv6(send_params) 108 icmp = sp.ICMPv6EchoRequest(data=sp.raw(build_payload(send_length))) 109 if send_frag_length: 110 for packet in sp.fragment6(ip6 / icmp, fragSize=send_frag_length): 111 packets.append(ether / packet) 112 else: 113 packets.append(ether / ip6 / icmp) 114 115 else: 116 ip = prepare_ipv4(send_params) 117 icmp = sp.ICMP(type='echo-request') 118 raw = sp.raw(build_payload(send_length)) 119 if send_frag_length: 120 for packet in sp.fragment(ip / icmp / raw, fragsize=send_frag_length): 121 packets.append(ether / packet) 122 else: 123 packets.append(ether / ip / icmp / raw) 124 for packet in packets: 125 sp.sendp(packet, iface=send_params['sendif'], verbose=False) 126 127 128def send_tcp_syn(send_params): 129 tcpopt_unaligned = send_params.get('tcpopt_unaligned') 130 seq = send_params.get('seq') 131 mss = send_params.get('mss') 132 ether = sp.Ether() 133 opts=[('Timestamp', (1, 1)), ('MSS', mss if mss else 1280)] 134 if tcpopt_unaligned: 135 opts = [('NOP', 0 )] + opts 136 if ':' in send_params['dst_address']: 137 ip = prepare_ipv6(send_params) 138 else: 139 ip = prepare_ipv4(send_params) 140 tcp = sp.TCP( 141 sport=send_params.get('sport'), dport=send_params.get('dport'), 142 flags='S', options=opts, seq=seq, 143 ) 144 req = ether / ip / tcp 145 sp.sendp(req, iface=send_params['sendif'], verbose=False) 146 147 148def send_udp(send_params): 149 LOGGER.debug(f'Sending UDP ping') 150 packets = [] 151 send_length = send_params['length'] 152 send_frag_length = send_params['frag_length'] 153 ether = sp.Ether() 154 if ':' in send_params['dst_address']: 155 ip6 = prepare_ipv6(send_params) 156 udp = sp.UDP( 157 sport=send_params.get('sport'), dport=send_params.get('dport'), 158 ) 159 raw = sp.Raw(load=build_payload(send_length)) 160 if send_frag_length: 161 for packet in sp.fragment6(ip6 / udp / raw, fragSize=send_frag_length): 162 packets.append(ether / packet) 163 else: 164 packets.append(ether / ip6 / udp / raw) 165 else: 166 ip = prepare_ipv4(send_params) 167 udp = sp.UDP( 168 sport=send_params.get('sport'), dport=send_params.get('dport'), 169 ) 170 raw = sp.Raw(load=build_payload(send_length)) 171 if send_frag_length: 172 for packet in sp.fragment(ip / udp / raw, fragsize=send_frag_length): 173 packets.append(ether / packet) 174 else: 175 packets.append(ether / ip / udp / raw) 176 177 for packet in packets: 178 sp.sendp(packet, iface=send_params['sendif'], verbose=False) 179 180 181def send_ping(ping_type, send_params): 182 if ping_type == 'icmp': 183 send_icmp_ping(send_params) 184 elif ( 185 ping_type == 'tcpsyn' or 186 ping_type == 'tcp3way' 187 ): 188 send_tcp_syn(send_params) 189 elif ping_type == 'udp': 190 send_udp(send_params) 191 else: 192 raise Exception('Unsupported ping type') 193 194 195def check_ipv4(expect_params, packet): 196 src_address = expect_params.get('src_address') 197 dst_address = expect_params.get('dst_address') 198 flags = expect_params.get('flags') 199 tos = expect_params.get('tc') 200 ttl = expect_params.get('hlim') 201 ip = packet.getlayer(sp.IP) 202 LOGGER.debug(f'Packet: {ip}') 203 if not ip: 204 LOGGER.debug('Packet is not IPv4!') 205 return False 206 if src_address and ip.src != src_address: 207 LOGGER.debug(f'Wrong IPv4 source {ip.src}, expected {src_address}') 208 return False 209 if dst_address and ip.dst != dst_address: 210 LOGGER.debug(f'Wrong IPv4 destination {ip.dst}, expected {dst_address}') 211 return False 212 if flags and ip.flags != flags: 213 LOGGER.debug(f'Wrong IP flags value {ip.flags}, expected {flags}') 214 return False 215 if tos and ip.tos != tos: 216 LOGGER.debug(f'Wrong ToS value {ip.tos}, expected {tos}') 217 return False 218 if ttl and ip.ttl != ttl: 219 LOGGER.debug(f'Wrong TTL value {ip.ttl}, expected {ttl}') 220 return False 221 return True 222 223 224def check_ipv6(expect_params, packet): 225 src_address = expect_params.get('src_address') 226 dst_address = expect_params.get('dst_address') 227 flags = expect_params.get('flags') 228 hlim = expect_params.get('hlim') 229 tc = expect_params.get('tc') 230 fl = expect_params.get('fl') 231 ip6 = packet.getlayer(sp.IPv6) 232 if not ip6: 233 LOGGER.debug('Packet is not IPv6!') 234 return False 235 if src_address and socket.inet_pton(socket.AF_INET6, ip6.src) != \ 236 socket.inet_pton(socket.AF_INET6, src_address): 237 LOGGER.debug(f'Wrong IPv6 source {ip6.src}, expected {src_address}') 238 return False 239 if dst_address and socket.inet_pton(socket.AF_INET6, ip6.dst) != \ 240 socket.inet_pton(socket.AF_INET6, dst_address): 241 LOGGER.debug(f'Wrong IPv6 destination {ip6.dst}, expected {dst_address}') 242 return False 243 # IPv6 has no IP-level checksum. 244 if flags: 245 raise Exception("There's no fragmentation flags in IPv6") 246 if hlim and ip6.hlim != hlim: 247 LOGGER.debug(f'Wrong Hop Limit value {ip6.hlim}, expected {hlim}') 248 return False 249 if tc and ip6.tc != tc: 250 LOGGER.debug(f'Wrong TC value {ip6.tc}, expected {tc}') 251 return False 252 if fl and ip6.fl != fl: 253 LOGGER.debug(f'Wrong Flow Label value {ip6.fl}, expected {fl}') 254 return False 255 return True 256 257 258def check_ping_4(expect_params, packet): 259 expect_length = expect_params['length'] 260 if not check_ipv4(expect_params, packet): 261 return False 262 icmp = packet.getlayer(sp.ICMP) 263 if not icmp: 264 LOGGER.debug('Packet is not IPv4 ICMP!') 265 return False 266 raw = packet.getlayer(sp.Raw) 267 if not raw: 268 LOGGER.debug('Packet contains no payload!') 269 return False 270 if raw.load != build_payload(expect_length): 271 LOGGER.debug('Payload magic does not match!') 272 return False 273 return True 274 275 276def check_ping_request_4(expect_params, packet): 277 if not check_ping_4(expect_params, packet): 278 return False 279 icmp = packet.getlayer(sp.ICMP) 280 if sp.icmptypes[icmp.type] != 'echo-request': 281 LOGGER.debug('Packet is not IPv4 ICMP Echo Request!') 282 return False 283 return True 284 285 286def check_ping_reply_4(expect_params, packet): 287 if not check_ping_4(expect_params, packet): 288 return False 289 icmp = packet.getlayer(sp.ICMP) 290 if sp.icmptypes[icmp.type] != 'echo-reply': 291 LOGGER.debug('Packet is not IPv4 ICMP Echo Reply!') 292 return False 293 return True 294 295 296def check_ping_request_6(expect_params, packet): 297 expect_length = expect_params['length'] 298 if not check_ipv6(expect_params, packet): 299 return False 300 icmp = packet.getlayer(sp.ICMPv6EchoRequest) 301 if not icmp: 302 LOGGER.debug('Packet is not IPv6 ICMP Echo Request!') 303 return False 304 if icmp.data != build_payload(expect_length): 305 LOGGER.debug('Payload magic does not match!') 306 return False 307 return True 308 309 310def check_ping_reply_6(expect_params, packet): 311 expect_length = expect_params['length'] 312 if not check_ipv6(expect_params, packet): 313 return False 314 icmp = packet.getlayer(sp.ICMPv6EchoReply) 315 if not icmp: 316 LOGGER.debug('Packet is not IPv6 ICMP Echo Reply!') 317 return False 318 if icmp.data != build_payload(expect_length): 319 LOGGER.debug('Payload magic does not match!') 320 return False 321 return True 322 323 324def check_ping_request(args, packet): 325 src_address = args['expect_params'].get('src_address') 326 dst_address = args['expect_params'].get('dst_address') 327 if not (src_address or dst_address): 328 raise Exception('Source or destination address must be given to match the ping request!') 329 if ( 330 (src_address and ':' in src_address) or 331 (dst_address and ':' in dst_address) 332 ): 333 return check_ping_request_6(args['expect_params'], packet) 334 else: 335 return check_ping_request_4(args['expect_params'], packet) 336 337 338def check_ping_reply(args, packet): 339 src_address = args['expect_params'].get('src_address') 340 dst_address = args['expect_params'].get('dst_address') 341 if not (src_address or dst_address): 342 raise Exception('Source or destination address must be given to match the ping reply!') 343 if ( 344 (src_address and ':' in src_address) or 345 (dst_address and ':' in dst_address) 346 ): 347 return check_ping_reply_6(args['expect_params'], packet) 348 else: 349 return check_ping_reply_4(args['expect_params'], packet) 350 351 352def check_tcp(expect_params, packet): 353 tcp_flags = expect_params.get('tcp_flags') 354 mss = expect_params.get('mss') 355 seq = expect_params.get('seq') 356 tcp = packet.getlayer(sp.TCP) 357 if not tcp: 358 LOGGER.debug('Packet is not TCP!') 359 return False 360 chksum = tcp.chksum 361 tcp.chksum = None 362 newpacket = sp.Ether(sp.raw(packet[sp.Ether])) 363 new_chksum = newpacket[sp.TCP].chksum 364 if new_chksum and chksum != new_chksum: 365 LOGGER.debug(f'Wrong TCP checksum {chksum}, expected {new_chksum}!') 366 return False 367 if tcp_flags and tcp.flags != tcp_flags: 368 LOGGER.debug(f'Wrong TCP flags {tcp.flags}, expected {tcp_flags}!') 369 return False 370 if seq: 371 if tcp_flags == 'S': 372 tcp_seq = tcp.seq 373 elif tcp_flags == 'SA': 374 tcp_seq = tcp.ack - 1 375 if seq != tcp_seq: 376 LOGGER.debug(f'Wrong TCP Sequence Number {tcp_seq}, expected {seq}') 377 return False 378 if mss: 379 for option in tcp.options: 380 if option[0] == 'MSS': 381 if option[1] != mss: 382 LOGGER.debug(f'Wrong TCP MSS {option[1]}, expected {mss}') 383 return False 384 return True 385 386 387def check_udp(expect_params, packet): 388 expect_length = expect_params['length'] 389 udp = packet.getlayer(sp.UDP) 390 if not udp: 391 LOGGER.debug('Packet is not UDP!') 392 return False 393 raw = packet.getlayer(sp.Raw) 394 if not raw: 395 LOGGER.debug('Packet contains no payload!') 396 return False 397 if raw.load != build_payload(expect_length): 398 LOGGER.debug(f'Payload magic does not match len {len(raw.load)} vs {expect_length}!') 399 return False 400 orig_chksum = udp.chksum 401 udp.chksum = None 402 newpacket = sp.Ether(sp.raw(packet[sp.Ether])) 403 new_chksum = newpacket[sp.UDP].chksum 404 if new_chksum and orig_chksum != new_chksum: 405 LOGGER.debug(f'Wrong UDP checksum {orig_chksum}, expected {new_chksum}!') 406 return False 407 408 return True 409 410 411def check_tcp_syn_request_4(expect_params, packet): 412 if not check_ipv4(expect_params, packet): 413 return False 414 if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet): 415 return False 416 return True 417 418 419def check_tcp_syn_reply_4(send_params, expect_params, packet): 420 if not check_ipv4(expect_params, packet): 421 return False 422 if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet): 423 return False 424 return True 425 426 427def check_tcp_3way_4(args, packet): 428 send_params = args['send_params'] 429 430 expect_params_sa = clean_params(args['expect_params']) 431 expect_params_sa['src_address'] = send_params['dst_address'] 432 expect_params_sa['dst_address'] = send_params['src_address'] 433 434 # Sniff incoming SYN+ACK packet 435 if ( 436 check_ipv4(expect_params_sa, packet) and 437 check_tcp(expect_params_sa | {'tcp_flags': 'SA'}, packet) 438 ): 439 ether = sp.Ether() 440 ip_sa = packet.getlayer(sp.IP) 441 tcp_sa = packet.getlayer(sp.TCP) 442 reply_params = clean_params(send_params) 443 reply_params['src_address'] = ip_sa.dst 444 reply_params['dst_address'] = ip_sa.src 445 ip_a = prepare_ipv4(reply_params) 446 tcp_a = sp.TCP( 447 sport=tcp_sa.dport, dport=tcp_sa.sport, flags='A', 448 seq=tcp_sa.ack, ack=tcp_sa.seq + 1, 449 ) 450 req = ether / ip_a / tcp_a 451 sp.sendp(req, iface=send_params['sendif'], verbose=False) 452 return True 453 454 return False 455 456 457def check_udp_request_4(expect_params, packet): 458 if not check_ipv4(expect_params, packet): 459 return False 460 if not check_udp(expect_params, packet): 461 return False 462 return True 463 464 465def check_tcp_syn_request_6(expect_params, packet): 466 if not check_ipv6(expect_params, packet): 467 return False 468 if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet): 469 return False 470 return True 471 472 473def check_tcp_syn_reply_6(expect_params, packet): 474 if not check_ipv6(expect_params, packet): 475 return False 476 if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet): 477 return False 478 return True 479 480 481def check_tcp_3way_6(args, packet): 482 send_params = args['send_params'] 483 484 expect_params_sa = clean_params(args['expect_params']) 485 expect_params_sa['src_address'] = send_params['dst_address'] 486 expect_params_sa['dst_address'] = send_params['src_address'] 487 488 # Sniff incoming SYN+ACK packet 489 if ( 490 check_ipv6(expect_params_sa, packet) and 491 check_tcp(expect_params_sa | {'tcp_flags': 'SA'}, packet) 492 ): 493 ether = sp.Ether() 494 ip6_sa = packet.getlayer(sp.IPv6) 495 tcp_sa = packet.getlayer(sp.TCP) 496 reply_params = clean_params(send_params) 497 reply_params['src_address'] = ip6_sa.dst 498 reply_params['dst_address'] = ip6_sa.src 499 ip_a = prepare_ipv6(reply_params) 500 tcp_a = sp.TCP( 501 sport=tcp_sa.dport, dport=tcp_sa.sport, flags='A', 502 seq=tcp_sa.ack, ack=tcp_sa.seq + 1, 503 ) 504 req = ether / ip_a / tcp_a 505 sp.sendp(req, iface=send_params['sendif'], verbose=False) 506 return True 507 508 return False 509 510 511def check_udp_request_6(expect_params, packet): 512 if not check_ipv6(expect_params, packet): 513 return False 514 if not check_udp(expect_params, packet): 515 return False 516 return True 517 518def check_tcp_syn_request(args, packet): 519 expect_params = args['expect_params'] 520 src_address = expect_params.get('src_address') 521 dst_address = expect_params.get('dst_address') 522 if not (src_address or dst_address): 523 raise Exception('Source or destination address must be given to match the tcp syn request!') 524 if ( 525 (src_address and ':' in src_address) or 526 (dst_address and ':' in dst_address) 527 ): 528 return check_tcp_syn_request_6(expect_params, packet) 529 else: 530 return check_tcp_syn_request_4(expect_params, packet) 531 532 533def check_tcp_syn_reply(args, packet): 534 expect_params = args['expect_params'] 535 src_address = expect_params.get('src_address') 536 dst_address = expect_params.get('dst_address') 537 if not (src_address or dst_address): 538 raise Exception('Source or destination address must be given to match the tcp syn reply!') 539 if ( 540 (src_address and ':' in src_address) or 541 (dst_address and ':' in dst_address) 542 ): 543 return check_tcp_syn_reply_6(expect_params, packet) 544 else: 545 return check_tcp_syn_reply_4(expect_params, packet) 546 547def check_tcp_3way(args, packet): 548 expect_params = args['expect_params'] 549 src_address = expect_params.get('src_address') 550 dst_address = expect_params.get('dst_address') 551 if not (src_address or dst_address): 552 raise Exception('Source or destination address must be given to match the tcp syn reply!') 553 if ( 554 (src_address and ':' in src_address) or 555 (dst_address and ':' in dst_address) 556 ): 557 return check_tcp_3way_6(args, packet) 558 else: 559 return check_tcp_3way_4(args, packet) 560 561 562def check_udp_request(args, packet): 563 expect_params = args['expect_params'] 564 src_address = expect_params.get('src_address') 565 dst_address = expect_params.get('dst_address') 566 if not (src_address or dst_address): 567 raise Exception('Source or destination address must be given to match the tcp syn request!') 568 if ( 569 (src_address and ':' in src_address) or 570 (dst_address and ':' in dst_address) 571 ): 572 return check_udp_request_6(expect_params, packet) 573 else: 574 return check_udp_request_4(expect_params, packet) 575 576 577def setup_sniffer( 578 recvif, ping_type, sniff_type, expect_params, defrag, send_params, 579): 580 if ping_type == 'icmp' and sniff_type == 'request': 581 checkfn = check_ping_request 582 elif ping_type == 'icmp' and sniff_type == 'reply': 583 checkfn = check_ping_reply 584 elif ping_type == 'tcpsyn' and sniff_type == 'request': 585 checkfn = check_tcp_syn_request 586 elif ping_type == 'tcpsyn' and sniff_type == 'reply': 587 checkfn = check_tcp_syn_reply 588 elif ping_type == 'tcp3way' and sniff_type == 'reply': 589 checkfn = check_tcp_3way 590 elif ping_type == 'udp' and sniff_type == 'request': 591 checkfn = check_udp_request 592 else: 593 raise Exception('Unspported ping and sniff type combination') 594 595 return Sniffer( 596 {'send_params': send_params, 'expect_params': expect_params}, 597 checkfn, recvif, defrag=defrag, 598 ) 599 600 601def parse_args(): 602 parser = argparse.ArgumentParser("pft_ping.py", 603 description="Ping test tool") 604 605 # Parameters of sent ping request 606 parser.add_argument('--sendif', required=True, 607 help='The interface through which the packet(s) will be sent') 608 parser.add_argument('--to', required=True, 609 help='The destination IP address for the ping request') 610 parser.add_argument('--ping-type', 611 choices=('icmp', 'tcpsyn', 'tcp3way', 'udp'), 612 help='Type of ping: ICMP (default) or TCP SYN or 3-way TCP handshake', 613 default='icmp') 614 parser.add_argument('--fromaddr', 615 help='The source IP address for the ping request') 616 617 # Where to look for packets to analyze. 618 # The '+' format is ugly as it mixes positional with optional syntax. 619 # But we have no positional parameters so I guess it's fine to use it. 620 parser.add_argument('--recvif', nargs='+', 621 help='The interfaces on which to expect the ping request') 622 parser.add_argument('--replyif', nargs='+', 623 help='The interfaces which to expect the ping response') 624 625 # Packet settings 626 parser_send = parser.add_argument_group('Values set in transmitted packets') 627 parser_send.add_argument('--send-flags', type=str, 628 help='IPv4 fragmentation flags') 629 parser_send.add_argument('--send-frag-length', type=int, 630 help='Force IP fragmentation with given fragment length') 631 parser_send.add_argument('--send-hlim', type=int, 632 help='IPv6 Hop Limit or IPv4 Time To Live') 633 parser_send.add_argument('--send-mss', type=int, 634 help='TCP Maximum Segment Size') 635 parser_send.add_argument('--send-seq', type=int, 636 help='TCP sequence number') 637 parser_send.add_argument('--send-sport', type=int, 638 help='TCP source port') 639 parser_send.add_argument('--send-dport', type=int, default=9, 640 help='TCP destination port') 641 parser_send.add_argument('--send-length', type=int, default=len(PAYLOAD_MAGIC), 642 help='ICMP Echo Request payload size') 643 parser_send.add_argument('--send-tc', type=int, 644 help='IPv6 Traffic Class or IPv4 DiffServ / ToS') 645 parser_send.add_argument('--send-fl', type=int, 646 help='IPv6 Flow label') 647 parser_send.add_argument('--send-tcpopt-unaligned', action='store_true', 648 help='Include unaligned TCP options') 649 parser_send.add_argument('--send-nop', action='store_true', 650 help='Include a NOP IPv4 option') 651 652 # Expectations 653 parser_expect = parser.add_argument_group('Values expected in sniffed packets') 654 parser_expect.add_argument('--expect-flags', type=str, 655 help='IPv4 fragmentation flags') 656 parser_expect.add_argument('--expect-hlim', type=int, 657 help='IPv6 Hop Limit or IPv4 Time To Live') 658 parser_expect.add_argument('--expect-mss', type=int, 659 help='TCP Maximum Segment Size') 660 parser_send.add_argument('--expect-seq', type=int, 661 help='TCP sequence number') 662 parser_expect.add_argument('--expect-tc', type=int, 663 help='IPv6 Traffic Class or IPv4 DiffServ / ToS') 664 parser_expect.add_argument('--expect-fl', type=int, 665 help='IPv6 Flow Label') 666 667 parser.add_argument('-v', '--verbose', action='store_true', 668 help=('Enable verbose logging. Apart of potentially useful information ' 669 'you might see warnings from parsing packets like NDP or other ' 670 'packets not related to the test being run. Use only when ' 671 'developing because real tests expect empty stderr and stdout.')) 672 673 return parser.parse_args() 674 675 676def main(): 677 args = parse_args() 678 679 if args.verbose: 680 LOGGER.setLevel(logging.DEBUG) 681 682 # Split parameters into send and expect parameters. Parameters might be 683 # missing from the command line, always fill the dictionaries with None. 684 send_params = {} 685 expect_params = {} 686 for param_name in ( 687 'flags', 'hlim', 'length', 'mss', 'seq', 'tc', 'fl', 'frag_length', 688 'sport', 'dport', 689 ): 690 param_arg = vars(args).get(f'send_{param_name}') 691 send_params[param_name] = param_arg if param_arg else None 692 param_arg = vars(args).get(f'expect_{param_name}') 693 expect_params[param_name] = param_arg if param_arg else None 694 695 expect_params['length'] = send_params['length'] 696 send_params['tcpopt_unaligned'] = args.send_tcpopt_unaligned 697 send_params['nop'] = args.send_nop 698 send_params['src_address'] = args.fromaddr if args.fromaddr else None 699 send_params['dst_address'] = args.to 700 send_params['sendif'] = args.sendif 701 702 # We may not have a default route. Tell scapy where to start looking for routes 703 sp.conf.iface6 = args.sendif 704 705 # Configuration sanity checking. 706 if not (args.replyif or args.recvif): 707 raise Exception('With no reply or recv interface specified no traffic ' 708 'can be sniffed and verified!' 709 ) 710 711 sniffers = [] 712 713 if send_params['frag_length']: 714 if ( 715 (send_params['src_address'] and ':' in send_params['src_address']) or 716 (send_params['dst_address'] and ':' in send_params['dst_address']) 717 ): 718 defrag = 'IPv6' 719 else: 720 defrag = 'IPv4' 721 else: 722 defrag = False 723 724 if args.recvif: 725 sniffer_params = copy(expect_params) 726 sniffer_params['src_address'] = None 727 sniffer_params['dst_address'] = args.to 728 for iface in args.recvif: 729 LOGGER.debug(f'Installing receive sniffer on {iface}') 730 sniffers.append( 731 setup_sniffer(iface, args.ping_type, 'request', 732 sniffer_params, defrag, send_params, 733 )) 734 735 if args.replyif: 736 sniffer_params = copy(expect_params) 737 sniffer_params['src_address'] = args.to 738 sniffer_params['dst_address'] = None 739 for iface in args.replyif: 740 LOGGER.debug(f'Installing reply sniffer on {iface}') 741 sniffers.append( 742 setup_sniffer(iface, args.ping_type, 'reply', 743 sniffer_params, defrag, send_params, 744 )) 745 746 LOGGER.debug(f'Installed {len(sniffers)} sniffers') 747 748 send_ping(args.ping_type, send_params) 749 750 err = 0 751 sniffer_num = 0 752 for sniffer in sniffers: 753 sniffer.join() 754 if sniffer.correctPackets == 1: 755 LOGGER.debug(f'Expected ping has been sniffed on {sniffer._recvif}.') 756 else: 757 # Set a bit in err for each failed sniffer. 758 err |= 1<<sniffer_num 759 if sniffer.correctPackets > 1: 760 LOGGER.debug(f'Duplicated ping has been sniffed on {sniffer._recvif}!') 761 else: 762 LOGGER.debug(f'Expected ping has not been sniffed on {sniffer._recvif}!') 763 sniffer_num += 1 764 765 return err 766 767 768if __name__ == '__main__': 769 sys.exit(main()) 770