1#!/usr/bin/env python3 2# 3# SPDX-License-Identifier: BSD-2-Clause 4# 5# Copyright (c) 2017 Kristof Provost <kp@FreeBSD.org> 6# Copyright (c) 2023 Kajetan Staszkiewicz <vegeta@tuxpowered.net> 7# 8# Redistribution and use in source and binary forms, with or without 9# modification, are permitted provided that the following conditions 10# are met: 11# 1. Redistributions of source code must retain the above copyright 12# notice, this list of conditions and the following disclaimer. 13# 2. Redistributions in binary form must reproduce the above copyright 14# notice, this list of conditions and the following disclaimer in the 15# documentation and/or other materials provided with the distribution. 16# 17# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 18# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 21# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27# SUCH DAMAGE. 28# 29 30import argparse 31import logging 32logging.getLogger("scapy").setLevel(logging.CRITICAL) 33import math 34import scapy.all as sp 35import sys 36import socket 37 38from copy import copy 39from sniffer import Sniffer 40 41logging.basicConfig(format='%(message)s') 42LOGGER = logging.getLogger(__name__) 43 44PAYLOAD_MAGIC = bytes.fromhex('42c0ffee') 45 46def build_payload(l): 47 pl = len(PAYLOAD_MAGIC) 48 ret = PAYLOAD_MAGIC * math.floor(l/pl) 49 ret += PAYLOAD_MAGIC[0:(l % pl)] 50 return ret 51 52 53def clean_params(params): 54 # Prepare a copy of safe copy of params 55 ret = copy(params) 56 ret.pop('src_address') 57 ret.pop('dst_address') 58 ret.pop('flags') 59 return ret 60 61def prepare_ipv6(send_params): 62 src_address = send_params.get('src_address') 63 dst_address = send_params.get('dst_address') 64 hlim = send_params.get('hlim') 65 tc = send_params.get('tc') 66 ip6 = sp.IPv6(dst=dst_address) 67 if src_address: 68 ip6.src = src_address 69 if hlim: 70 ip6.hlim = hlim 71 if tc: 72 ip6.tc = tc 73 return ip6 74 75 76def prepare_ipv4(send_params): 77 src_address = send_params.get('src_address') 78 dst_address = send_params.get('dst_address') 79 flags = send_params.get('flags') 80 tos = send_params.get('tc') 81 ttl = send_params.get('hlim') 82 opt = send_params.get('nop') 83 options = '' 84 if opt: 85 options='\x00' 86 ip = sp.IP(dst=dst_address, options=options) 87 if src_address: 88 ip.src = src_address 89 if flags: 90 ip.flags = flags 91 if tos: 92 ip.tos = tos 93 if ttl: 94 ip.ttl = ttl 95 return ip 96 97 98def send_icmp_ping(send_params): 99 send_length = send_params['length'] 100 send_frag_length = send_params['frag_length'] 101 packets = [] 102 ether = sp.Ether() 103 if ':' in send_params['dst_address']: 104 ip6 = prepare_ipv6(send_params) 105 icmp = sp.ICMPv6EchoRequest(data=sp.raw(build_payload(send_length))) 106 if send_frag_length: 107 for packet in sp.fragment6(ip6 / icmp, fragSize=send_frag_length): 108 packets.append(ether / packet) 109 else: 110 packets.append(ether / ip6 / icmp) 111 112 else: 113 ip = prepare_ipv4(send_params) 114 icmp = sp.ICMP(type='echo-request') 115 raw = sp.raw(build_payload(send_length)) 116 if send_frag_length: 117 for packet in sp.fragment(ip / icmp / raw, fragsize=send_frag_length): 118 packets.append(ether / packet) 119 else: 120 packets.append(ether / ip / icmp / raw) 121 for packet in packets: 122 sp.sendp(packet, iface=send_params['sendif'], verbose=False) 123 124 125def send_tcp_syn(send_params): 126 tcpopt_unaligned = send_params.get('tcpopt_unaligned') 127 seq = send_params.get('seq') 128 mss = send_params.get('mss') 129 ether = sp.Ether() 130 opts=[('Timestamp', (1, 1)), ('MSS', mss if mss else 1280)] 131 if tcpopt_unaligned: 132 opts = [('NOP', 0 )] + opts 133 if ':' in send_params['dst_address']: 134 ip = prepare_ipv6(send_params) 135 else: 136 ip = prepare_ipv4(send_params) 137 tcp = sp.TCP( 138 sport=send_params.get('sport'), dport=send_params.get('dport'), 139 flags='S', options=opts, seq=seq, 140 ) 141 req = ether / ip / tcp 142 sp.sendp(req, iface=send_params['sendif'], verbose=False) 143 144 145def send_udp(send_params): 146 LOGGER.debug(f'Sending UDP ping') 147 packets = [] 148 send_length = send_params['length'] 149 send_frag_length = send_params['frag_length'] 150 ether = sp.Ether() 151 if ':' in send_params['dst_address']: 152 ip6 = prepare_ipv6(send_params) 153 udp = sp.UDP( 154 sport=send_params.get('sport'), dport=send_params.get('dport'), 155 ) 156 raw = sp.Raw(load=build_payload(send_length)) 157 if send_frag_length: 158 for packet in sp.fragment6(ip6 / udp / raw, fragSize=send_frag_length): 159 packets.append(ether / packet) 160 else: 161 packets.append(ether / ip6 / udp / raw) 162 else: 163 ip = prepare_ipv4(send_params) 164 udp = sp.UDP( 165 sport=send_params.get('sport'), dport=send_params.get('dport'), 166 ) 167 raw = sp.Raw(load=build_payload(send_length)) 168 if send_frag_length: 169 for packet in sp.fragment(ip / udp / raw, fragsize=send_frag_length): 170 packets.append(ether / packet) 171 else: 172 packets.append(ether / ip / udp / raw) 173 174 for packet in packets: 175 sp.sendp(packet, iface=send_params['sendif'], verbose=False) 176 177 178def send_ping(ping_type, send_params): 179 if ping_type == 'icmp': 180 send_icmp_ping(send_params) 181 elif ( 182 ping_type == 'tcpsyn' or 183 ping_type == 'tcp3way' 184 ): 185 send_tcp_syn(send_params) 186 elif ping_type == 'udp': 187 send_udp(send_params) 188 else: 189 raise Exception('Unsupported ping type') 190 191 192def check_ipv4(expect_params, packet): 193 src_address = expect_params.get('src_address') 194 dst_address = expect_params.get('dst_address') 195 flags = expect_params.get('flags') 196 tos = expect_params.get('tc') 197 ttl = expect_params.get('hlim') 198 ip = packet.getlayer(sp.IP) 199 LOGGER.debug(f'Packet: {ip}') 200 if not ip: 201 LOGGER.debug('Packet is not IPv4!') 202 return False 203 if src_address and ip.src != src_address: 204 LOGGER.debug(f'Wrong IPv4 source {ip.src}, expected {src_address}') 205 return False 206 if dst_address and ip.dst != dst_address: 207 LOGGER.debug(f'Wrong IPv4 destination {ip.dst}, expected {dst_address}') 208 return False 209 if flags and ip.flags != flags: 210 LOGGER.debug(f'Wrong IP flags value {ip.flags}, expected {flags}') 211 return False 212 if tos and ip.tos != tos: 213 LOGGER.debug(f'Wrong ToS value {ip.tos}, expected {tos}') 214 return False 215 if ttl and ip.ttl != ttl: 216 LOGGER.debug(f'Wrong TTL value {ip.ttl}, expected {ttl}') 217 return False 218 return True 219 220 221def check_ipv6(expect_params, packet): 222 src_address = expect_params.get('src_address') 223 dst_address = expect_params.get('dst_address') 224 flags = expect_params.get('flags') 225 hlim = expect_params.get('hlim') 226 tc = expect_params.get('tc') 227 ip6 = packet.getlayer(sp.IPv6) 228 if not ip6: 229 LOGGER.debug('Packet is not IPv6!') 230 return False 231 if src_address and socket.inet_pton(socket.AF_INET6, ip6.src) != \ 232 socket.inet_pton(socket.AF_INET6, src_address): 233 LOGGER.debug(f'Wrong IPv6 source {ip6.src}, expected {src_address}') 234 return False 235 if dst_address and socket.inet_pton(socket.AF_INET6, ip6.dst) != \ 236 socket.inet_pton(socket.AF_INET6, dst_address): 237 LOGGER.debug(f'Wrong IPv6 destination {ip6.dst}, expected {dst_address}') 238 return False 239 # IPv6 has no IP-level checksum. 240 if flags: 241 raise Exception("There's no fragmentation flags in IPv6") 242 if hlim and ip6.hlim != hlim: 243 LOGGER.debug(f'Wrong Hop Limit value {ip6.hlim}, expected {hlim}') 244 return False 245 if tc and ip6.tc != tc: 246 LOGGER.debug(f'Wrong TC value {ip6.tc}, expected {tc}') 247 return False 248 return True 249 250 251def check_ping_4(expect_params, packet): 252 expect_length = expect_params['length'] 253 if not check_ipv4(expect_params, packet): 254 return False 255 icmp = packet.getlayer(sp.ICMP) 256 if not icmp: 257 LOGGER.debug('Packet is not IPv4 ICMP!') 258 return False 259 raw = packet.getlayer(sp.Raw) 260 if not raw: 261 LOGGER.debug('Packet contains no payload!') 262 return False 263 if raw.load != build_payload(expect_length): 264 LOGGER.debug('Payload magic does not match!') 265 return False 266 return True 267 268 269def check_ping_request_4(expect_params, packet): 270 if not check_ping_4(expect_params, packet): 271 return False 272 icmp = packet.getlayer(sp.ICMP) 273 if sp.icmptypes[icmp.type] != 'echo-request': 274 LOGGER.debug('Packet is not IPv4 ICMP Echo Request!') 275 return False 276 return True 277 278 279def check_ping_reply_4(expect_params, packet): 280 if not check_ping_4(expect_params, packet): 281 return False 282 icmp = packet.getlayer(sp.ICMP) 283 if sp.icmptypes[icmp.type] != 'echo-reply': 284 LOGGER.debug('Packet is not IPv4 ICMP Echo Reply!') 285 return False 286 return True 287 288 289def check_ping_request_6(expect_params, packet): 290 expect_length = expect_params['length'] 291 if not check_ipv6(expect_params, packet): 292 return False 293 icmp = packet.getlayer(sp.ICMPv6EchoRequest) 294 if not icmp: 295 LOGGER.debug('Packet is not IPv6 ICMP Echo Request!') 296 return False 297 if icmp.data != build_payload(expect_length): 298 LOGGER.debug('Payload magic does not match!') 299 return False 300 return True 301 302 303def check_ping_reply_6(expect_params, packet): 304 expect_length = expect_params['length'] 305 if not check_ipv6(expect_params, packet): 306 return False 307 icmp = packet.getlayer(sp.ICMPv6EchoReply) 308 if not icmp: 309 LOGGER.debug('Packet is not IPv6 ICMP Echo Reply!') 310 return False 311 if icmp.data != build_payload(expect_length): 312 LOGGER.debug('Payload magic does not match!') 313 return False 314 return True 315 316 317def check_ping_request(args, packet): 318 src_address = args['expect_params'].get('src_address') 319 dst_address = args['expect_params'].get('dst_address') 320 if not (src_address or dst_address): 321 raise Exception('Source or destination address must be given to match the ping request!') 322 if ( 323 (src_address and ':' in src_address) or 324 (dst_address and ':' in dst_address) 325 ): 326 return check_ping_request_6(args['expect_params'], packet) 327 else: 328 return check_ping_request_4(args['expect_params'], packet) 329 330 331def check_ping_reply(args, packet): 332 src_address = args['expect_params'].get('src_address') 333 dst_address = args['expect_params'].get('dst_address') 334 if not (src_address or dst_address): 335 raise Exception('Source or destination address must be given to match the ping reply!') 336 if ( 337 (src_address and ':' in src_address) or 338 (dst_address and ':' in dst_address) 339 ): 340 return check_ping_reply_6(args['expect_params'], packet) 341 else: 342 return check_ping_reply_4(args['expect_params'], packet) 343 344 345def check_tcp(expect_params, packet): 346 tcp_flags = expect_params.get('tcp_flags') 347 mss = expect_params.get('mss') 348 seq = expect_params.get('seq') 349 tcp = packet.getlayer(sp.TCP) 350 if not tcp: 351 LOGGER.debug('Packet is not TCP!') 352 return False 353 chksum = tcp.chksum 354 tcp.chksum = None 355 newpacket = sp.Ether(sp.raw(packet[sp.Ether])) 356 new_chksum = newpacket[sp.TCP].chksum 357 if new_chksum and chksum != new_chksum: 358 LOGGER.debug(f'Wrong TCP checksum {chksum}, expected {new_chksum}!') 359 return False 360 if tcp_flags and tcp.flags != tcp_flags: 361 LOGGER.debug(f'Wrong TCP flags {tcp.flags}, expected {tcp_flags}!') 362 return False 363 if seq: 364 if tcp_flags == 'S': 365 tcp_seq = tcp.seq 366 elif tcp_flags == 'SA': 367 tcp_seq = tcp.ack - 1 368 if seq != tcp_seq: 369 LOGGER.debug(f'Wrong TCP Sequence Number {tcp_seq}, expected {seq}') 370 return False 371 if mss: 372 for option in tcp.options: 373 if option[0] == 'MSS': 374 if option[1] != mss: 375 LOGGER.debug(f'Wrong TCP MSS {option[1]}, expected {mss}') 376 return False 377 return True 378 379 380def check_udp(expect_params, packet): 381 expect_length = expect_params['length'] 382 udp = packet.getlayer(sp.UDP) 383 if not udp: 384 LOGGER.debug('Packet is not UDP!') 385 return False 386 raw = packet.getlayer(sp.Raw) 387 if not raw: 388 LOGGER.debug('Packet contains no payload!') 389 return False 390 if raw.load != build_payload(expect_length): 391 LOGGER.debug(f'Payload magic does not match len {len(raw.load)} vs {expect_length}!') 392 return False 393 orig_chksum = udp.chksum 394 udp.chksum = None 395 newpacket = sp.Ether(sp.raw(packet[sp.Ether])) 396 new_chksum = newpacket[sp.UDP].chksum 397 if new_chksum and orig_chksum != new_chksum: 398 LOGGER.debug(f'Wrong UDP checksum {orig_chksum}, expected {new_chksum}!') 399 return False 400 401 return True 402 403 404def check_tcp_syn_request_4(expect_params, packet): 405 if not check_ipv4(expect_params, packet): 406 return False 407 if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet): 408 return False 409 return True 410 411 412def check_tcp_syn_reply_4(send_params, expect_params, packet): 413 if not check_ipv4(expect_params, packet): 414 return False 415 if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet): 416 return False 417 return True 418 419 420def check_tcp_3way_4(args, packet): 421 send_params = args['send_params'] 422 423 expect_params_sa = clean_params(args['expect_params']) 424 expect_params_sa['src_address'] = send_params['dst_address'] 425 expect_params_sa['dst_address'] = send_params['src_address'] 426 427 # Sniff incoming SYN+ACK packet 428 if ( 429 check_ipv4(expect_params_sa, packet) and 430 check_tcp(expect_params_sa | {'tcp_flags': 'SA'}, packet) 431 ): 432 ether = sp.Ether() 433 ip_sa = packet.getlayer(sp.IP) 434 tcp_sa = packet.getlayer(sp.TCP) 435 reply_params = clean_params(send_params) 436 reply_params['src_address'] = ip_sa.dst 437 reply_params['dst_address'] = ip_sa.src 438 ip_a = prepare_ipv4(reply_params) 439 tcp_a = sp.TCP( 440 sport=tcp_sa.dport, dport=tcp_sa.sport, flags='A', 441 seq=tcp_sa.ack, ack=tcp_sa.seq + 1, 442 ) 443 req = ether / ip_a / tcp_a 444 sp.sendp(req, iface=send_params['sendif'], verbose=False) 445 return True 446 447 return False 448 449 450def check_udp_request_4(expect_params, packet): 451 if not check_ipv4(expect_params, packet): 452 return False 453 if not check_udp(expect_params, packet): 454 return False 455 return True 456 457 458def check_tcp_syn_request_6(expect_params, packet): 459 if not check_ipv6(expect_params, packet): 460 return False 461 if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet): 462 return False 463 return True 464 465 466def check_tcp_syn_reply_6(expect_params, packet): 467 if not check_ipv6(expect_params, packet): 468 return False 469 if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet): 470 return False 471 return True 472 473 474def check_tcp_3way_6(args, packet): 475 send_params = args['send_params'] 476 477 expect_params_sa = clean_params(args['expect_params']) 478 expect_params_sa['src_address'] = send_params['dst_address'] 479 expect_params_sa['dst_address'] = send_params['src_address'] 480 481 # Sniff incoming SYN+ACK packet 482 if ( 483 check_ipv6(expect_params_sa, packet) and 484 check_tcp(expect_params_sa | {'tcp_flags': 'SA'}, packet) 485 ): 486 ether = sp.Ether() 487 ip6_sa = packet.getlayer(sp.IPv6) 488 tcp_sa = packet.getlayer(sp.TCP) 489 reply_params = clean_params(send_params) 490 reply_params['src_address'] = ip6_sa.dst 491 reply_params['dst_address'] = ip6_sa.src 492 ip_a = prepare_ipv6(reply_params) 493 tcp_a = sp.TCP( 494 sport=tcp_sa.dport, dport=tcp_sa.sport, flags='A', 495 seq=tcp_sa.ack, ack=tcp_sa.seq + 1, 496 ) 497 req = ether / ip_a / tcp_a 498 sp.sendp(req, iface=send_params['sendif'], verbose=False) 499 return True 500 501 return False 502 503 504def check_udp_request_6(expect_params, packet): 505 if not check_ipv6(expect_params, packet): 506 return False 507 if not check_udp(expect_params, packet): 508 return False 509 return True 510 511def check_tcp_syn_request(args, packet): 512 expect_params = args['expect_params'] 513 src_address = expect_params.get('src_address') 514 dst_address = expect_params.get('dst_address') 515 if not (src_address or dst_address): 516 raise Exception('Source or destination address must be given to match the tcp syn request!') 517 if ( 518 (src_address and ':' in src_address) or 519 (dst_address and ':' in dst_address) 520 ): 521 return check_tcp_syn_request_6(expect_params, packet) 522 else: 523 return check_tcp_syn_request_4(expect_params, packet) 524 525 526def check_tcp_syn_reply(args, packet): 527 expect_params = args['expect_params'] 528 src_address = expect_params.get('src_address') 529 dst_address = expect_params.get('dst_address') 530 if not (src_address or dst_address): 531 raise Exception('Source or destination address must be given to match the tcp syn reply!') 532 if ( 533 (src_address and ':' in src_address) or 534 (dst_address and ':' in dst_address) 535 ): 536 return check_tcp_syn_reply_6(expect_params, packet) 537 else: 538 return check_tcp_syn_reply_4(expect_params, packet) 539 540def check_tcp_3way(args, packet): 541 expect_params = args['expect_params'] 542 src_address = expect_params.get('src_address') 543 dst_address = expect_params.get('dst_address') 544 if not (src_address or dst_address): 545 raise Exception('Source or destination address must be given to match the tcp syn reply!') 546 if ( 547 (src_address and ':' in src_address) or 548 (dst_address and ':' in dst_address) 549 ): 550 return check_tcp_3way_6(args, packet) 551 else: 552 return check_tcp_3way_4(args, packet) 553 554 555def check_udp_request(args, packet): 556 expect_params = args['expect_params'] 557 src_address = expect_params.get('src_address') 558 dst_address = expect_params.get('dst_address') 559 if not (src_address or dst_address): 560 raise Exception('Source or destination address must be given to match the tcp syn request!') 561 if ( 562 (src_address and ':' in src_address) or 563 (dst_address and ':' in dst_address) 564 ): 565 return check_udp_request_6(expect_params, packet) 566 else: 567 return check_udp_request_4(expect_params, packet) 568 569 570def setup_sniffer( 571 recvif, ping_type, sniff_type, expect_params, defrag, send_params, 572): 573 if ping_type == 'icmp' and sniff_type == 'request': 574 checkfn = check_ping_request 575 elif ping_type == 'icmp' and sniff_type == 'reply': 576 checkfn = check_ping_reply 577 elif ping_type == 'tcpsyn' and sniff_type == 'request': 578 checkfn = check_tcp_syn_request 579 elif ping_type == 'tcpsyn' and sniff_type == 'reply': 580 checkfn = check_tcp_syn_reply 581 elif ping_type == 'tcp3way' and sniff_type == 'reply': 582 checkfn = check_tcp_3way 583 elif ping_type == 'udp' and sniff_type == 'request': 584 checkfn = check_udp_request 585 else: 586 raise Exception('Unspported ping and sniff type combination') 587 588 return Sniffer( 589 {'send_params': send_params, 'expect_params': expect_params}, 590 checkfn, recvif, defrag=defrag, 591 ) 592 593 594def parse_args(): 595 parser = argparse.ArgumentParser("pft_ping.py", 596 description="Ping test tool") 597 598 # Parameters of sent ping request 599 parser.add_argument('--sendif', required=True, 600 help='The interface through which the packet(s) will be sent') 601 parser.add_argument('--to', required=True, 602 help='The destination IP address for the ping request') 603 parser.add_argument('--ping-type', 604 choices=('icmp', 'tcpsyn', 'tcp3way', 'udp'), 605 help='Type of ping: ICMP (default) or TCP SYN or 3-way TCP handshake', 606 default='icmp') 607 parser.add_argument('--fromaddr', 608 help='The source IP address for the ping request') 609 610 # Where to look for packets to analyze. 611 # The '+' format is ugly as it mixes positional with optional syntax. 612 # But we have no positional parameters so I guess it's fine to use it. 613 parser.add_argument('--recvif', nargs='+', 614 help='The interfaces on which to expect the ping request') 615 parser.add_argument('--replyif', nargs='+', 616 help='The interfaces which to expect the ping response') 617 618 # Packet settings 619 parser_send = parser.add_argument_group('Values set in transmitted packets') 620 parser_send.add_argument('--send-flags', type=str, 621 help='IPv4 fragmentation flags') 622 parser_send.add_argument('--send-frag-length', type=int, 623 help='Force IP fragmentation with given fragment length') 624 parser_send.add_argument('--send-hlim', type=int, 625 help='IPv6 Hop Limit or IPv4 Time To Live') 626 parser_send.add_argument('--send-mss', type=int, 627 help='TCP Maximum Segment Size') 628 parser_send.add_argument('--send-seq', type=int, 629 help='TCP sequence number') 630 parser_send.add_argument('--send-sport', type=int, 631 help='TCP source port') 632 parser_send.add_argument('--send-dport', type=int, default=9, 633 help='TCP destination port') 634 parser_send.add_argument('--send-length', type=int, default=len(PAYLOAD_MAGIC), 635 help='ICMP Echo Request payload size') 636 parser_send.add_argument('--send-tc', type=int, 637 help='IPv6 Traffic Class or IPv4 DiffServ / ToS') 638 parser_send.add_argument('--send-tcpopt-unaligned', action='store_true', 639 help='Include unaligned TCP options') 640 parser_send.add_argument('--send-nop', action='store_true', 641 help='Include a NOP IPv4 option') 642 643 # Expectations 644 parser_expect = parser.add_argument_group('Values expected in sniffed packets') 645 parser_expect.add_argument('--expect-flags', type=str, 646 help='IPv4 fragmentation flags') 647 parser_expect.add_argument('--expect-hlim', type=int, 648 help='IPv6 Hop Limit or IPv4 Time To Live') 649 parser_expect.add_argument('--expect-mss', type=int, 650 help='TCP Maximum Segment Size') 651 parser_send.add_argument('--expect-seq', type=int, 652 help='TCP sequence number') 653 parser_expect.add_argument('--expect-tc', type=int, 654 help='IPv6 Traffic Class or IPv4 DiffServ / ToS') 655 656 parser.add_argument('-v', '--verbose', action='store_true', 657 help=('Enable verbose logging. Apart of potentially useful information ' 658 'you might see warnings from parsing packets like NDP or other ' 659 'packets not related to the test being run. Use only when ' 660 'developing because real tests expect empty stderr and stdout.')) 661 662 return parser.parse_args() 663 664 665def main(): 666 args = parse_args() 667 668 if args.verbose: 669 LOGGER.setLevel(logging.DEBUG) 670 671 # Split parameters into send and expect parameters. Parameters might be 672 # missing from the command line, always fill the dictionaries with None. 673 send_params = {} 674 expect_params = {} 675 for param_name in ( 676 'flags', 'hlim', 'length', 'mss', 'seq', 'tc', 'frag_length', 677 'sport', 'dport', 678 ): 679 param_arg = vars(args).get(f'send_{param_name}') 680 send_params[param_name] = param_arg if param_arg else None 681 param_arg = vars(args).get(f'expect_{param_name}') 682 expect_params[param_name] = param_arg if param_arg else None 683 684 expect_params['length'] = send_params['length'] 685 send_params['tcpopt_unaligned'] = args.send_tcpopt_unaligned 686 send_params['nop'] = args.send_nop 687 send_params['src_address'] = args.fromaddr if args.fromaddr else None 688 send_params['dst_address'] = args.to 689 send_params['sendif'] = args.sendif 690 691 # We may not have a default route. Tell scapy where to start looking for routes 692 sp.conf.iface6 = args.sendif 693 694 # Configuration sanity checking. 695 if not (args.replyif or args.recvif): 696 raise Exception('With no reply or recv interface specified no traffic ' 697 'can be sniffed and verified!' 698 ) 699 700 sniffers = [] 701 702 if send_params['frag_length']: 703 if ( 704 (send_params['src_address'] and ':' in send_params['src_address']) or 705 (send_params['dst_address'] and ':' in send_params['dst_address']) 706 ): 707 defrag = 'IPv6' 708 else: 709 defrag = 'IPv4' 710 else: 711 defrag = False 712 713 if args.recvif: 714 sniffer_params = copy(expect_params) 715 sniffer_params['src_address'] = None 716 sniffer_params['dst_address'] = args.to 717 for iface in args.recvif: 718 LOGGER.debug(f'Installing receive sniffer on {iface}') 719 sniffers.append( 720 setup_sniffer(iface, args.ping_type, 'request', 721 sniffer_params, defrag, send_params, 722 )) 723 724 if args.replyif: 725 sniffer_params = copy(expect_params) 726 sniffer_params['src_address'] = args.to 727 sniffer_params['dst_address'] = None 728 for iface in args.replyif: 729 LOGGER.debug(f'Installing reply sniffer on {iface}') 730 sniffers.append( 731 setup_sniffer(iface, args.ping_type, 'reply', 732 sniffer_params, defrag, send_params, 733 )) 734 735 LOGGER.debug(f'Installed {len(sniffers)} sniffers') 736 737 send_ping(args.ping_type, send_params) 738 739 err = 0 740 sniffer_num = 0 741 for sniffer in sniffers: 742 sniffer.join() 743 if sniffer.correctPackets == 1: 744 LOGGER.debug(f'Expected ping has been sniffed on {sniffer._recvif}.') 745 else: 746 # Set a bit in err for each failed sniffer. 747 err |= 1<<sniffer_num 748 if sniffer.correctPackets > 1: 749 LOGGER.debug(f'Duplicated ping has been sniffed on {sniffer._recvif}!') 750 else: 751 LOGGER.debug(f'Expected ping has not been sniffed on {sniffer._recvif}!') 752 sniffer_num += 1 753 754 return err 755 756 757if __name__ == '__main__': 758 sys.exit(main()) 759