xref: /freebsd/tests/sys/netpfil/common/pft_ping.py (revision 59f5f100b774de8824fb2fc1a8a11a93bbc2dafd)
1#!/usr/bin/env python3
2#
3# SPDX-License-Identifier: BSD-2-Clause
4#
5# Copyright (c) 2017 Kristof Provost <kp@FreeBSD.org>
6# Copyright (c) 2023 Kajetan Staszkiewicz <vegeta@tuxpowered.net>
7#
8# Redistribution and use in source and binary forms, with or without
9# modification, are permitted provided that the following conditions
10# are met:
11# 1. Redistributions of source code must retain the above copyright
12#    notice, this list of conditions and the following disclaimer.
13# 2. Redistributions in binary form must reproduce the above copyright
14#    notice, this list of conditions and the following disclaimer in the
15#    documentation and/or other materials provided with the distribution.
16#
17# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27# SUCH DAMAGE.
28#
29
30import argparse
31import logging
32logging.getLogger("scapy").setLevel(logging.CRITICAL)
33import math
34import scapy.all as sp
35import sys
36
37from copy import copy
38from sniffer import Sniffer
39
40logging.basicConfig(format='%(message)s')
41LOGGER = logging.getLogger(__name__)
42
43PAYLOAD_MAGIC = bytes.fromhex('42c0ffee')
44
45def build_payload(l):
46    pl = len(PAYLOAD_MAGIC)
47    ret = PAYLOAD_MAGIC * math.floor(l/pl)
48    ret += PAYLOAD_MAGIC[0:(l % pl)]
49    return ret
50
51
52def clean_params(params):
53    # Prepare a copy of safe copy of params
54    ret = copy(params)
55    ret.pop('src_address')
56    ret.pop('dst_address')
57    ret.pop('flags')
58    return ret
59
60def prepare_ipv6(send_params):
61    src_address = send_params.get('src_address')
62    dst_address = send_params.get('dst_address')
63    hlim = send_params.get('hlim')
64    tc = send_params.get('tc')
65    ip6 = sp.IPv6(dst=dst_address)
66    if src_address:
67        ip6.src = src_address
68    if hlim:
69        ip6.hlim = hlim
70    if tc:
71        ip6.tc = tc
72    return ip6
73
74
75def prepare_ipv4(send_params):
76    src_address = send_params.get('src_address')
77    dst_address = send_params.get('dst_address')
78    flags = send_params.get('flags')
79    tos = send_params.get('tc')
80    ttl = send_params.get('hlim')
81    opt = send_params.get('nop')
82    options = ''
83    if opt:
84        options='\x00'
85    ip = sp.IP(dst=dst_address, options=options)
86    if src_address:
87        ip.src = src_address
88    if flags:
89        ip.flags = flags
90    if tos:
91        ip.tos = tos
92    if ttl:
93        ip.ttl = ttl
94    return ip
95
96
97def send_icmp_ping(send_params):
98    send_length = send_params['length']
99    send_frag_length = send_params['frag_length']
100    packets = []
101    ether = sp.Ether()
102    if ':' in send_params['dst_address']:
103        ip6 = prepare_ipv6(send_params)
104        icmp = sp.ICMPv6EchoRequest(data=sp.raw(build_payload(send_length)))
105        if send_frag_length:
106            for packet in sp.fragment6(ip6 / icmp, fragSize=send_frag_length):
107                packets.append(ether / packet)
108        else:
109            packets.append(ether / ip6 / icmp)
110
111    else:
112        ip = prepare_ipv4(send_params)
113        icmp = sp.ICMP(type='echo-request')
114        raw = sp.raw(build_payload(send_length))
115        if send_frag_length:
116            for packet in sp.fragment(ip / icmp / raw, fragsize=send_frag_length):
117                packets.append(ether / packet)
118        else:
119            packets.append(ether / ip / icmp / raw)
120    for packet in packets:
121        sp.sendp(packet, iface=send_params['sendif'], verbose=False)
122
123
124def send_tcp_syn(send_params):
125    tcpopt_unaligned = send_params.get('tcpopt_unaligned')
126    seq = send_params.get('seq')
127    mss = send_params.get('mss')
128    ether = sp.Ether()
129    opts=[('Timestamp', (1, 1)), ('MSS', mss if mss else 1280)]
130    if tcpopt_unaligned:
131        opts = [('NOP', 0 )] + opts
132    if ':' in send_params['dst_address']:
133        ip = prepare_ipv6(send_params)
134    else:
135        ip = prepare_ipv4(send_params)
136    tcp = sp.TCP(
137        sport=send_params.get('sport'), dport=send_params.get('dport'),
138        flags='S', options=opts, seq=seq,
139    )
140    req = ether / ip / tcp
141    sp.sendp(req, iface=send_params['sendif'], verbose=False)
142
143
144def send_udp(send_params):
145    LOGGER.debug(f'Sending UDP ping')
146    packets = []
147    send_length = send_params['length']
148    send_frag_length = send_params['frag_length']
149    ether = sp.Ether()
150    if ':' in send_params['dst_address']:
151        ip6 = prepare_ipv6(send_params)
152        udp = sp.UDP(
153            sport=send_params.get('sport'), dport=send_params.get('dport'),
154        )
155        raw = sp.Raw(load=build_payload(send_length))
156        if send_frag_length:
157            for packet in sp.fragment6(ip6 / udp / raw, fragSize=send_frag_length):
158                packets.append(ether / packet)
159        else:
160            packets.append(ether / ip6 / udp / raw)
161    else:
162        ip = prepare_ipv4(send_params)
163        udp = sp.UDP(
164            sport=send_params.get('sport'), dport=send_params.get('dport'),
165        )
166        raw = sp.Raw(load=build_payload(send_length))
167        if send_frag_length:
168            for packet in sp.fragment(ip / udp / raw, fragsize=send_frag_length):
169                packets.append(ether / packet)
170        else:
171            packets.append(ether / ip / udp / raw)
172
173    for packet in packets:
174        sp.sendp(packet, iface=send_params['sendif'], verbose=False)
175
176
177def send_ping(ping_type, send_params):
178    if ping_type == 'icmp':
179        send_icmp_ping(send_params)
180    elif (
181        ping_type == 'tcpsyn' or
182        ping_type == 'tcp3way'
183    ):
184        send_tcp_syn(send_params)
185    elif ping_type == 'udp':
186        send_udp(send_params)
187    else:
188        raise Exception('Unsupported ping type')
189
190
191def check_ipv4(expect_params, packet):
192    src_address = expect_params.get('src_address')
193    dst_address = expect_params.get('dst_address')
194    flags = expect_params.get('flags')
195    tos = expect_params.get('tc')
196    ttl = expect_params.get('hlim')
197    ip = packet.getlayer(sp.IP)
198    LOGGER.debug(f'Packet: {ip}')
199    if not ip:
200        LOGGER.debug('Packet is not IPv4!')
201        return False
202    if src_address and ip.src != src_address:
203        LOGGER.debug(f'Wrong IPv4 source {ip.src}, expected {src_address}')
204        return False
205    if dst_address and ip.dst != dst_address:
206        LOGGER.debug(f'Wrong IPv4 destination {ip.dst}, expected {dst_address}')
207        return False
208    chksum = ip.chksum
209    ip.chksum = None
210    new_chksum = sp.IP(sp.raw(ip)).chksum
211    if chksum != new_chksum:
212        LOGGER.debug(f'Wrong IPv4 checksum {chksum}, expected {new_chksum}')
213        return False
214    if flags and ip.flags != flags:
215        LOGGER.debug(f'Wrong IP flags value {ip.flags}, expected {flags}')
216        return False
217    if tos and ip.tos != tos:
218        LOGGER.debug(f'Wrong ToS value {ip.tos}, expected {tos}')
219        return False
220    if ttl and ip.ttl != ttl:
221        LOGGER.debug(f'Wrong TTL value {ip.ttl}, expected {ttl}')
222        return False
223    return True
224
225
226def check_ipv6(expect_params, packet):
227    src_address = expect_params.get('src_address')
228    dst_address = expect_params.get('dst_address')
229    flags = expect_params.get('flags')
230    hlim = expect_params.get('hlim')
231    tc = expect_params.get('tc')
232    ip6 = packet.getlayer(sp.IPv6)
233    if not ip6:
234        LOGGER.debug('Packet is not IPv6!')
235        return False
236    if src_address and ip6.src != src_address:
237        LOGGER.debug(f'Wrong IPv6 source {ip6.src}, expected {src_address}')
238        return False
239    if dst_address and ip6.dst != dst_address:
240        LOGGER.debug(f'Wrong IPv6 destination {ip6.dst}, expected {dst_address}')
241        return False
242    # IPv6 has no IP-level checksum.
243    if flags:
244        raise Exception("There's no fragmentation flags in IPv6")
245    if hlim and ip6.hlim != hlim:
246        LOGGER.debug(f'Wrong Hop Limit value {ip6.hlim}, expected {hlim}')
247        return False
248    if tc and ip6.tc != tc:
249        LOGGER.debug(f'Wrong TC value {ip6.tc}, expected {tc}')
250        return False
251    return True
252
253
254def check_ping_4(expect_params, packet):
255    expect_length = expect_params['length']
256    if not check_ipv4(expect_params, packet):
257        return False
258    icmp = packet.getlayer(sp.ICMP)
259    if not icmp:
260        LOGGER.debug('Packet is not IPv4 ICMP!')
261        return False
262    raw = packet.getlayer(sp.Raw)
263    if not raw:
264        LOGGER.debug('Packet contains no payload!')
265        return False
266    if raw.load != build_payload(expect_length):
267        LOGGER.debug('Payload magic does not match!')
268        return False
269    return True
270
271
272def check_ping_request_4(expect_params, packet):
273    if not check_ping_4(expect_params, packet):
274        return False
275    icmp = packet.getlayer(sp.ICMP)
276    if sp.icmptypes[icmp.type] != 'echo-request':
277        LOGGER.debug('Packet is not IPv4 ICMP Echo Request!')
278        return False
279    return True
280
281
282def check_ping_reply_4(expect_params, packet):
283    if not check_ping_4(expect_params, packet):
284        return False
285    icmp = packet.getlayer(sp.ICMP)
286    if sp.icmptypes[icmp.type] != 'echo-reply':
287        LOGGER.debug('Packet is not IPv4 ICMP Echo Reply!')
288        return False
289    return True
290
291
292def check_ping_request_6(expect_params, packet):
293    expect_length = expect_params['length']
294    if not check_ipv6(expect_params, packet):
295        return False
296    icmp = packet.getlayer(sp.ICMPv6EchoRequest)
297    if not icmp:
298        LOGGER.debug('Packet is not IPv6 ICMP Echo Request!')
299        return False
300    if icmp.data != build_payload(expect_length):
301        LOGGER.debug('Payload magic does not match!')
302        return False
303    return True
304
305
306def check_ping_reply_6(expect_params, packet):
307    expect_length = expect_params['length']
308    if not check_ipv6(expect_params, packet):
309        return False
310    icmp = packet.getlayer(sp.ICMPv6EchoReply)
311    if not icmp:
312        LOGGER.debug('Packet is not IPv6 ICMP Echo Reply!')
313        return False
314    if icmp.data != build_payload(expect_length):
315        LOGGER.debug('Payload magic does not match!')
316        return False
317    return True
318
319
320def check_ping_request(args, packet):
321    src_address = args['expect_params'].get('src_address')
322    dst_address = args['expect_params'].get('dst_address')
323    if not (src_address or dst_address):
324        raise Exception('Source or destination address must be given to match the ping request!')
325    if (
326        (src_address and ':' in src_address) or
327        (dst_address and ':' in dst_address)
328    ):
329        return check_ping_request_6(args['expect_params'], packet)
330    else:
331        return check_ping_request_4(args['expect_params'], packet)
332
333
334def check_ping_reply(args, packet):
335    src_address = args['expect_params'].get('src_address')
336    dst_address = args['expect_params'].get('dst_address')
337    if not (src_address or dst_address):
338        raise Exception('Source or destination address must be given to match the ping reply!')
339    if (
340        (src_address and ':' in src_address) or
341        (dst_address and ':' in dst_address)
342    ):
343        return check_ping_reply_6(args['expect_params'], packet)
344    else:
345        return check_ping_reply_4(args['expect_params'], packet)
346
347
348def check_tcp(expect_params, packet):
349    tcp_flags = expect_params.get('tcp_flags')
350    mss = expect_params.get('mss')
351    seq = expect_params.get('seq')
352    tcp = packet.getlayer(sp.TCP)
353    if not tcp:
354        LOGGER.debug('Packet is not TCP!')
355        return False
356    chksum = tcp.chksum
357    tcp.chksum = None
358    newpacket = sp.Ether(sp.raw(packet[sp.Ether]))
359    new_chksum = newpacket[sp.TCP].chksum
360    if new_chksum and chksum != new_chksum:
361        LOGGER.debug(f'Wrong TCP checksum {chksum}, expected {new_chksum}!')
362        return False
363    if tcp_flags and tcp.flags != tcp_flags:
364        LOGGER.debug(f'Wrong TCP flags {tcp.flags}, expected {tcp_flags}!')
365        return False
366    if seq:
367        if tcp_flags == 'S':
368            tcp_seq = tcp.seq
369        elif tcp_flags == 'SA':
370            tcp_seq = tcp.ack - 1
371        if seq != tcp_seq:
372            LOGGER.debug(f'Wrong TCP Sequence Number {tcp_seq}, expected {seq}')
373            return False
374    if mss:
375        for option in tcp.options:
376            if option[0] == 'MSS':
377                if option[1] != mss:
378                    LOGGER.debug(f'Wrong TCP MSS {option[1]}, expected {mss}')
379                    return False
380    return True
381
382
383def check_udp(expect_params, packet):
384    expect_length = expect_params['length']
385    udp = packet.getlayer(sp.UDP)
386    if not udp:
387        LOGGER.debug('Packet is not UDP!')
388        return False
389    raw = packet.getlayer(sp.Raw)
390    if not raw:
391        LOGGER.debug('Packet contains no payload!')
392        return False
393    if raw.load != build_payload(expect_length):
394        LOGGER.debug(f'Payload magic does not match len {len(raw.load)} vs {expect_length}!')
395        return False
396    orig_chksum = udp.chksum
397    udp.chksum = None
398    newpacket = sp.Ether(sp.raw(packet[sp.Ether]))
399    new_chksum = newpacket[sp.UDP].chksum
400    if new_chksum and orig_chksum != new_chksum:
401        LOGGER.debug(f'Wrong UDP checksum {orig_chksum}, expected {new_chksum}!')
402        return False
403
404    return True
405
406
407def check_tcp_syn_request_4(expect_params, packet):
408    if not check_ipv4(expect_params, packet):
409        return False
410    if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet):
411        return False
412    return True
413
414
415def check_tcp_syn_reply_4(send_params, expect_params, packet):
416    if not check_ipv4(expect_params, packet):
417        return False
418    if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet):
419        return False
420    return True
421
422
423def check_tcp_3way_4(args, packet):
424    send_params = args['send_params']
425
426    expect_params_sa = clean_params(args['expect_params'])
427    expect_params_sa['src_address'] = send_params['dst_address']
428    expect_params_sa['dst_address'] = send_params['src_address']
429
430    # Sniff incoming SYN+ACK packet
431    if (
432        check_ipv4(expect_params_sa, packet) and
433        check_tcp(expect_params_sa | {'tcp_flags': 'SA'}, packet)
434    ):
435        ether = sp.Ether()
436        ip_sa = packet.getlayer(sp.IP)
437        tcp_sa = packet.getlayer(sp.TCP)
438        reply_params = clean_params(send_params)
439        reply_params['src_address'] = ip_sa.dst
440        reply_params['dst_address'] = ip_sa.src
441        ip_a = prepare_ipv4(reply_params)
442        tcp_a = sp.TCP(
443            sport=tcp_sa.dport, dport=tcp_sa.sport, flags='A',
444            seq=tcp_sa.ack, ack=tcp_sa.seq + 1,
445        )
446        req = ether / ip_a / tcp_a
447        sp.sendp(req, iface=send_params['sendif'], verbose=False)
448        return True
449
450    return False
451
452
453def check_udp_request_4(expect_params, packet):
454    if not check_ipv4(expect_params, packet):
455        return False
456    if not check_udp(expect_params, packet):
457        return False
458    return True
459
460
461def check_tcp_syn_request_6(expect_params, packet):
462    if not check_ipv6(expect_params, packet):
463        return False
464    if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet):
465        return False
466    return True
467
468
469def check_tcp_syn_reply_6(expect_params, packet):
470    if not check_ipv6(expect_params, packet):
471        return False
472    if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet):
473        return False
474    return True
475
476
477def check_tcp_3way_6(args, packet):
478    send_params = args['send_params']
479
480    expect_params_sa = clean_params(args['expect_params'])
481    expect_params_sa['src_address'] = send_params['dst_address']
482    expect_params_sa['dst_address'] = send_params['src_address']
483
484    # Sniff incoming SYN+ACK packet
485    if (
486        check_ipv6(expect_params_sa, packet) and
487        check_tcp(expect_params_sa | {'tcp_flags': 'SA'}, packet)
488    ):
489        ether = sp.Ether()
490        ip6_sa = packet.getlayer(sp.IPv6)
491        tcp_sa = packet.getlayer(sp.TCP)
492        reply_params = clean_params(send_params)
493        reply_params['src_address'] = ip6_sa.dst
494        reply_params['dst_address'] = ip6_sa.src
495        ip_a = prepare_ipv6(reply_params)
496        tcp_a = sp.TCP(
497            sport=tcp_sa.dport, dport=tcp_sa.sport, flags='A',
498            seq=tcp_sa.ack, ack=tcp_sa.seq + 1,
499        )
500        req = ether / ip_a / tcp_a
501        sp.sendp(req, iface=send_params['sendif'], verbose=False)
502        return True
503
504    return False
505
506
507def check_udp_request_6(expect_params, packet):
508    if not check_ipv6(expect_params, packet):
509        return False
510    if not check_udp(expect_params, packet):
511        return False
512    return True
513
514def check_tcp_syn_request(args, packet):
515    expect_params = args['expect_params']
516    src_address = expect_params.get('src_address')
517    dst_address = expect_params.get('dst_address')
518    if not (src_address or dst_address):
519        raise Exception('Source or destination address must be given to match the tcp syn request!')
520    if (
521        (src_address and ':' in src_address) or
522        (dst_address and ':' in dst_address)
523    ):
524        return check_tcp_syn_request_6(expect_params, packet)
525    else:
526        return check_tcp_syn_request_4(expect_params, packet)
527
528
529def check_tcp_syn_reply(args, packet):
530    expect_params = args['expect_params']
531    src_address = expect_params.get('src_address')
532    dst_address = expect_params.get('dst_address')
533    if not (src_address or dst_address):
534        raise Exception('Source or destination address must be given to match the tcp syn reply!')
535    if (
536        (src_address and ':' in src_address) or
537        (dst_address and ':' in dst_address)
538    ):
539        return check_tcp_syn_reply_6(expect_params, packet)
540    else:
541        return check_tcp_syn_reply_4(expect_params, packet)
542
543def check_tcp_3way(args, packet):
544    expect_params = args['expect_params']
545    src_address = expect_params.get('src_address')
546    dst_address = expect_params.get('dst_address')
547    if not (src_address or dst_address):
548        raise Exception('Source or destination address must be given to match the tcp syn reply!')
549    if (
550            (src_address and ':' in src_address) or
551            (dst_address and ':' in dst_address)
552    ):
553        return check_tcp_3way_6(args, packet)
554    else:
555        return check_tcp_3way_4(args, packet)
556
557
558def check_udp_request(args, packet):
559    expect_params = args['expect_params']
560    src_address = expect_params.get('src_address')
561    dst_address = expect_params.get('dst_address')
562    if not (src_address or dst_address):
563        raise Exception('Source or destination address must be given to match the tcp syn request!')
564    if (
565            (src_address and ':' in src_address) or
566            (dst_address and ':' in dst_address)
567    ):
568        return check_udp_request_6(expect_params, packet)
569    else:
570        return check_udp_request_4(expect_params, packet)
571
572
573def setup_sniffer(
574        recvif, ping_type, sniff_type, expect_params, defrag, send_params,
575):
576    if ping_type == 'icmp' and sniff_type == 'request':
577        checkfn = check_ping_request
578    elif ping_type == 'icmp' and sniff_type == 'reply':
579        checkfn = check_ping_reply
580    elif ping_type == 'tcpsyn' and sniff_type == 'request':
581        checkfn = check_tcp_syn_request
582    elif ping_type == 'tcpsyn' and sniff_type == 'reply':
583        checkfn = check_tcp_syn_reply
584    elif ping_type == 'tcp3way' and sniff_type == 'reply':
585        checkfn = check_tcp_3way
586    elif ping_type == 'udp' and sniff_type == 'request':
587        checkfn = check_udp_request
588    else:
589        raise Exception('Unspported ping and sniff type combination')
590
591    return Sniffer(
592        {'send_params': send_params, 'expect_params': expect_params},
593        checkfn, recvif, defrag=defrag,
594    )
595
596
597def parse_args():
598    parser = argparse.ArgumentParser("pft_ping.py",
599        description="Ping test tool")
600
601    # Parameters of sent ping request
602    parser.add_argument('--sendif', required=True,
603        help='The interface through which the packet(s) will be sent')
604    parser.add_argument('--to', required=True,
605        help='The destination IP address for the ping request')
606    parser.add_argument('--ping-type',
607        choices=('icmp', 'tcpsyn', 'tcp3way', 'udp'),
608        help='Type of ping: ICMP (default) or TCP SYN or 3-way TCP handshake',
609        default='icmp')
610    parser.add_argument('--fromaddr',
611        help='The source IP address for the ping request')
612
613    # Where to look for packets to analyze.
614    # The '+' format is ugly as it mixes positional with optional syntax.
615    # But we have no positional parameters so I guess it's fine to use it.
616    parser.add_argument('--recvif', nargs='+',
617        help='The interfaces on which to expect the ping request')
618    parser.add_argument('--replyif', nargs='+',
619        help='The interfaces which to expect the ping response')
620
621    # Packet settings
622    parser_send = parser.add_argument_group('Values set in transmitted packets')
623    parser_send.add_argument('--send-flags', type=str,
624        help='IPv4 fragmentation flags')
625    parser_send.add_argument('--send-frag-length', type=int,
626        help='Force IP fragmentation with given fragment length')
627    parser_send.add_argument('--send-hlim', type=int,
628        help='IPv6 Hop Limit or IPv4 Time To Live')
629    parser_send.add_argument('--send-mss', type=int,
630        help='TCP Maximum Segment Size')
631    parser_send.add_argument('--send-seq', type=int,
632        help='TCP sequence number')
633    parser_send.add_argument('--send-sport', type=int,
634        help='TCP source port')
635    parser_send.add_argument('--send-dport', type=int, default=9,
636        help='TCP destination port')
637    parser_send.add_argument('--send-length', type=int, default=len(PAYLOAD_MAGIC),
638        help='ICMP Echo Request payload size')
639    parser_send.add_argument('--send-tc', type=int,
640        help='IPv6 Traffic Class or IPv4 DiffServ / ToS')
641    parser_send.add_argument('--send-tcpopt-unaligned', action='store_true',
642        help='Include unaligned TCP options')
643    parser_send.add_argument('--send-nop', action='store_true',
644        help='Include a NOP IPv4 option')
645
646    # Expectations
647    parser_expect = parser.add_argument_group('Values expected in sniffed packets')
648    parser_expect.add_argument('--expect-flags', type=str,
649        help='IPv4 fragmentation flags')
650    parser_expect.add_argument('--expect-hlim', type=int,
651        help='IPv6 Hop Limit or IPv4 Time To Live')
652    parser_expect.add_argument('--expect-mss', type=int,
653        help='TCP Maximum Segment Size')
654    parser_send.add_argument('--expect-seq', type=int,
655        help='TCP sequence number')
656    parser_expect.add_argument('--expect-tc', type=int,
657        help='IPv6 Traffic Class or IPv4 DiffServ / ToS')
658
659    parser.add_argument('-v', '--verbose', action='store_true',
660        help=('Enable verbose logging. Apart of potentially useful information '
661            'you might see warnings from parsing packets like NDP or other '
662            'packets not related to the test being run. Use only when '
663            'developing because real tests expect empty stderr and stdout.'))
664
665    return parser.parse_args()
666
667
668def main():
669    args = parse_args()
670
671    if args.verbose:
672        LOGGER.setLevel(logging.DEBUG)
673
674    # Split parameters into send and expect parameters. Parameters might be
675    # missing from the command line, always fill the dictionaries with None.
676    send_params = {}
677    expect_params = {}
678    for param_name in (
679        'flags', 'hlim', 'length', 'mss', 'seq', 'tc', 'frag_length',
680        'sport', 'dport',
681    ):
682        param_arg = vars(args).get(f'send_{param_name}')
683        send_params[param_name] = param_arg if param_arg else None
684        param_arg = vars(args).get(f'expect_{param_name}')
685        expect_params[param_name] = param_arg if param_arg else None
686
687    expect_params['length'] = send_params['length']
688    send_params['tcpopt_unaligned'] = args.send_tcpopt_unaligned
689    send_params['nop'] = args.send_nop
690    send_params['src_address'] = args.fromaddr if args.fromaddr else None
691    send_params['dst_address'] = args.to
692    send_params['sendif'] = args.sendif
693
694    # We may not have a default route. Tell scapy where to start looking for routes
695    sp.conf.iface6 = args.sendif
696
697    # Configuration sanity checking.
698    if not (args.replyif or args.recvif):
699        raise Exception('With no reply or recv interface specified no traffic '
700            'can be sniffed and verified!'
701        )
702
703    sniffers = []
704
705    if send_params['frag_length']:
706        if (
707            (send_params['src_address'] and ':' in send_params['src_address']) or
708            (send_params['dst_address'] and ':' in send_params['dst_address'])
709        ):
710            defrag = 'IPv6'
711        else:
712            defrag = 'IPv4'
713    else:
714        defrag = False
715
716    if args.recvif:
717        sniffer_params = copy(expect_params)
718        sniffer_params['src_address'] = None
719        sniffer_params['dst_address'] = args.to
720        for iface in args.recvif:
721            LOGGER.debug(f'Installing receive sniffer on {iface}')
722            sniffers.append(
723                setup_sniffer(iface, args.ping_type, 'request',
724                              sniffer_params, defrag, send_params,
725            ))
726
727    if args.replyif:
728        sniffer_params = copy(expect_params)
729        sniffer_params['src_address'] = args.to
730        sniffer_params['dst_address'] = None
731        for iface in args.replyif:
732            LOGGER.debug(f'Installing reply sniffer on {iface}')
733            sniffers.append(
734                setup_sniffer(iface, args.ping_type, 'reply',
735                              sniffer_params, defrag, send_params,
736            ))
737
738    LOGGER.debug(f'Installed {len(sniffers)} sniffers')
739
740    send_ping(args.ping_type, send_params)
741
742    err = 0
743    sniffer_num = 0
744    for sniffer in sniffers:
745        sniffer.join()
746        if sniffer.correctPackets == 1:
747            LOGGER.debug(f'Expected ping has been sniffed on {sniffer._recvif}.')
748        else:
749            # Set a bit in err for each failed sniffer.
750            err |= 1<<sniffer_num
751            if sniffer.correctPackets > 1:
752                LOGGER.debug(f'Duplicated ping has been sniffed on {sniffer._recvif}!')
753            else:
754                LOGGER.debug(f'Expected ping has not been sniffed on {sniffer._recvif}!')
755        sniffer_num += 1
756
757    return err
758
759
760if __name__ == '__main__':
761    sys.exit(main())
762