xref: /freebsd/tests/sys/netpfil/common/pft_ping.py (revision 3d39eadcdeb301e95abdc94b1ad5d1255fa0f446)
1#!/usr/bin/env python3
2#
3# SPDX-License-Identifier: BSD-2-Clause
4#
5# Copyright (c) 2017 Kristof Provost <kp@FreeBSD.org>
6# Copyright (c) 2023 Kajetan Staszkiewicz <vegeta@tuxpowered.net>
7#
8# Redistribution and use in source and binary forms, with or without
9# modification, are permitted provided that the following conditions
10# are met:
11# 1. Redistributions of source code must retain the above copyright
12#    notice, this list of conditions and the following disclaimer.
13# 2. Redistributions in binary form must reproduce the above copyright
14#    notice, this list of conditions and the following disclaimer in the
15#    documentation and/or other materials provided with the distribution.
16#
17# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27# SUCH DAMAGE.
28#
29
30import argparse
31import logging
32logging.getLogger("scapy").setLevel(logging.CRITICAL)
33import math
34import scapy.all as sp
35import sys
36import socket
37
38from copy import copy
39from sniffer import Sniffer
40
41logging.basicConfig(format='%(message)s')
42LOGGER = logging.getLogger(__name__)
43
44PAYLOAD_MAGIC = bytes.fromhex('42c0ffee')
45
46def build_payload(l):
47    pl = len(PAYLOAD_MAGIC)
48    ret = PAYLOAD_MAGIC * math.floor(l/pl)
49    ret += PAYLOAD_MAGIC[0:(l % pl)]
50    return ret
51
52
53def clean_params(params):
54    # Prepare a copy of safe copy of params
55    ret = copy(params)
56    ret.pop('src_address')
57    ret.pop('dst_address')
58    ret.pop('flags')
59    return ret
60
61def prepare_ipv6(send_params):
62    src_address = send_params.get('src_address')
63    dst_address = send_params.get('dst_address')
64    hlim = send_params.get('hlim')
65    tc = send_params.get('tc')
66    fl = send_params.get('fl')
67    ip6 = sp.IPv6(dst=dst_address)
68    if src_address:
69        ip6.src = src_address
70    if hlim:
71        ip6.hlim = hlim
72    if tc:
73        ip6.tc = tc
74    if fl:
75        ip6.fl = fl
76    return ip6
77
78
79def prepare_ipv4(send_params):
80    src_address = send_params.get('src_address')
81    dst_address = send_params.get('dst_address')
82    flags = send_params.get('flags')
83    tos = send_params.get('tc')
84    ttl = send_params.get('hlim')
85    opt = send_params.get('nop')
86    options = ''
87    if opt:
88        options='\x00'
89    ip = sp.IP(dst=dst_address, options=options)
90    if src_address:
91        ip.src = src_address
92    if flags:
93        ip.flags = flags
94    if tos:
95        ip.tos = tos
96    if ttl:
97        ip.ttl = ttl
98    return ip
99
100
101def send_icmp_ping(send_params):
102    send_length = send_params['length']
103    send_frag_length = send_params['frag_length']
104    packets = []
105    ether = sp.Ether()
106    if ':' in send_params['dst_address']:
107        ip6 = prepare_ipv6(send_params)
108        icmp = sp.ICMPv6EchoRequest(data=sp.raw(build_payload(send_length)))
109        if send_frag_length:
110            for packet in sp.fragment6(ip6 / icmp, fragSize=send_frag_length):
111                packets.append(ether / packet)
112        else:
113            packets.append(ether / ip6 / icmp)
114
115    else:
116        ip = prepare_ipv4(send_params)
117        icmp = sp.ICMP(type='echo-request')
118        raw = sp.raw(build_payload(send_length))
119        if send_frag_length:
120            for packet in sp.fragment(ip / icmp / raw, fragsize=send_frag_length):
121                packets.append(ether / packet)
122        else:
123            packets.append(ether / ip / icmp / raw)
124    for packet in packets:
125        sp.sendp(packet, iface=send_params['sendif'], verbose=False)
126
127
128def send_tcp_syn(send_params):
129    tcpopt_unaligned = send_params.get('tcpopt_unaligned')
130    seq = send_params.get('seq')
131    mss = send_params.get('mss')
132    ether = sp.Ether()
133    opts=[('Timestamp', (1, 1)), ('MSS', mss if mss else 1280)]
134    if tcpopt_unaligned:
135        opts = [('NOP', 0 )] + opts
136    if ':' in send_params['dst_address']:
137        ip = prepare_ipv6(send_params)
138    else:
139        ip = prepare_ipv4(send_params)
140    tcp = sp.TCP(
141        sport=send_params.get('sport'), dport=send_params.get('dport'),
142        flags='S', options=opts, seq=seq,
143    )
144    req = ether / ip / tcp
145    sp.sendp(req, iface=send_params['sendif'], verbose=False)
146
147
148def send_udp(send_params):
149    LOGGER.debug(f'Sending UDP ping')
150    packets = []
151    send_length = send_params['length']
152    send_frag_length = send_params['frag_length']
153    ether = sp.Ether()
154    if ':' in send_params['dst_address']:
155        ip6 = prepare_ipv6(send_params)
156        udp = sp.UDP(
157            sport=send_params.get('sport'), dport=send_params.get('dport'),
158        )
159        raw = sp.Raw(load=build_payload(send_length))
160        if send_frag_length:
161            for packet in sp.fragment6(ip6 / udp / raw, fragSize=send_frag_length):
162                packets.append(ether / packet)
163        else:
164            packets.append(ether / ip6 / udp / raw)
165    else:
166        ip = prepare_ipv4(send_params)
167        udp = sp.UDP(
168            sport=send_params.get('sport'), dport=send_params.get('dport'),
169        )
170        raw = sp.Raw(load=build_payload(send_length))
171        if send_frag_length:
172            for packet in sp.fragment(ip / udp / raw, fragsize=send_frag_length):
173                packets.append(ether / packet)
174        else:
175            packets.append(ether / ip / udp / raw)
176
177    for packet in packets:
178        sp.sendp(packet, iface=send_params['sendif'], verbose=False)
179
180
181def send_ping(ping_type, send_params):
182    if ping_type == 'icmp':
183        send_icmp_ping(send_params)
184    elif (
185        ping_type == 'tcpsyn' or
186        ping_type == 'tcp3way'
187    ):
188        send_tcp_syn(send_params)
189    elif ping_type == 'udp':
190        send_udp(send_params)
191    else:
192        raise Exception('Unsupported ping type')
193
194
195def check_ipv4(expect_params, packet):
196    src_address = expect_params.get('src_address')
197    dst_address = expect_params.get('dst_address')
198    flags = expect_params.get('flags')
199    tos = expect_params.get('tc')
200    ttl = expect_params.get('hlim')
201    ip = packet.getlayer(sp.IP)
202    LOGGER.debug(f'Packet: {ip}')
203    if not ip:
204        LOGGER.debug('Packet is not IPv4!')
205        return False
206    if src_address and ip.src != src_address:
207        LOGGER.debug(f'Wrong IPv4 source {ip.src}, expected {src_address}')
208        return False
209    if dst_address and ip.dst != dst_address:
210        LOGGER.debug(f'Wrong IPv4 destination {ip.dst}, expected {dst_address}')
211        return False
212    if flags and ip.flags != flags:
213        LOGGER.debug(f'Wrong IP flags value {ip.flags}, expected {flags}')
214        return False
215    if tos and ip.tos != tos:
216        LOGGER.debug(f'Wrong ToS value {ip.tos}, expected {tos}')
217        return False
218    if ttl and ip.ttl != ttl:
219        LOGGER.debug(f'Wrong TTL value {ip.ttl}, expected {ttl}')
220        return False
221    return True
222
223
224def check_ipv6(expect_params, packet):
225    src_address = expect_params.get('src_address')
226    dst_address = expect_params.get('dst_address')
227    flags = expect_params.get('flags')
228    hlim = expect_params.get('hlim')
229    tc = expect_params.get('tc')
230    fl = expect_params.get('fl')
231    ip6 = packet.getlayer(sp.IPv6)
232    if not ip6:
233        LOGGER.debug('Packet is not IPv6!')
234        return False
235    if src_address and socket.inet_pton(socket.AF_INET6, ip6.src) != \
236      socket.inet_pton(socket.AF_INET6, src_address):
237        LOGGER.debug(f'Wrong IPv6 source {ip6.src}, expected {src_address}')
238        return False
239    if dst_address and socket.inet_pton(socket.AF_INET6, ip6.dst) != \
240      socket.inet_pton(socket.AF_INET6, dst_address):
241        LOGGER.debug(f'Wrong IPv6 destination {ip6.dst}, expected {dst_address}')
242        return False
243    # IPv6 has no IP-level checksum.
244    if flags:
245        raise Exception("There's no fragmentation flags in IPv6")
246    if hlim and ip6.hlim != hlim:
247        LOGGER.debug(f'Wrong Hop Limit value {ip6.hlim}, expected {hlim}')
248        return False
249    if tc and ip6.tc != tc:
250        LOGGER.debug(f'Wrong TC value {ip6.tc}, expected {tc}')
251        return False
252    if fl and ip6.fl != fl:
253        LOGGER.debug(f'Wrong Flow Label value {ip6.fl}, expected {fl}')
254        return False
255    return True
256
257
258def check_ping_4(expect_params, packet):
259    expect_length = expect_params['length']
260    if not check_ipv4(expect_params, packet):
261        return False
262    icmp = packet.getlayer(sp.ICMP)
263    if not icmp:
264        LOGGER.debug('Packet is not IPv4 ICMP!')
265        return False
266    raw = packet.getlayer(sp.Raw)
267    if not raw:
268        LOGGER.debug('Packet contains no payload!')
269        return False
270    if raw.load != build_payload(expect_length):
271        LOGGER.debug('Payload magic does not match!')
272        return False
273    return True
274
275
276def check_ping_request_4(expect_params, packet):
277    if not check_ping_4(expect_params, packet):
278        return False
279    icmp = packet.getlayer(sp.ICMP)
280    if sp.icmptypes[icmp.type] != 'echo-request':
281        LOGGER.debug('Packet is not IPv4 ICMP Echo Request!')
282        return False
283    return True
284
285
286def check_ping_reply_4(expect_params, packet):
287    if not check_ping_4(expect_params, packet):
288        return False
289    icmp = packet.getlayer(sp.ICMP)
290    if sp.icmptypes[icmp.type] != 'echo-reply':
291        LOGGER.debug('Packet is not IPv4 ICMP Echo Reply!')
292        return False
293    return True
294
295
296def check_ping_request_6(expect_params, packet):
297    expect_length = expect_params['length']
298    if not check_ipv6(expect_params, packet):
299        return False
300    icmp = packet.getlayer(sp.ICMPv6EchoRequest)
301    if not icmp:
302        LOGGER.debug('Packet is not IPv6 ICMP Echo Request!')
303        return False
304    if icmp.data != build_payload(expect_length):
305        LOGGER.debug('Payload magic does not match!')
306        return False
307    return True
308
309
310def check_ping_reply_6(expect_params, packet):
311    expect_length = expect_params['length']
312    if not check_ipv6(expect_params, packet):
313        return False
314    icmp = packet.getlayer(sp.ICMPv6EchoReply)
315    if not icmp:
316        LOGGER.debug('Packet is not IPv6 ICMP Echo Reply!')
317        return False
318    if icmp.data != build_payload(expect_length):
319        LOGGER.debug('Payload magic does not match!')
320        return False
321    return True
322
323
324def check_ping_request(args, packet):
325    src_address = args['expect_params'].get('src_address')
326    dst_address = args['expect_params'].get('dst_address')
327    if not (src_address or dst_address):
328        raise Exception('Source or destination address must be given to match the ping request!')
329    if (
330        (src_address and ':' in src_address) or
331        (dst_address and ':' in dst_address)
332    ):
333        return check_ping_request_6(args['expect_params'], packet)
334    else:
335        return check_ping_request_4(args['expect_params'], packet)
336
337
338def check_ping_reply(args, packet):
339    src_address = args['expect_params'].get('src_address')
340    dst_address = args['expect_params'].get('dst_address')
341    if not (src_address or dst_address):
342        raise Exception('Source or destination address must be given to match the ping reply!')
343    if (
344        (src_address and ':' in src_address) or
345        (dst_address and ':' in dst_address)
346    ):
347        return check_ping_reply_6(args['expect_params'], packet)
348    else:
349        return check_ping_reply_4(args['expect_params'], packet)
350
351
352def check_tcp(expect_params, packet):
353    tcp_flags = expect_params.get('tcp_flags')
354    mss = expect_params.get('mss')
355    seq = expect_params.get('seq')
356    tcp = packet.getlayer(sp.TCP)
357    if not tcp:
358        LOGGER.debug('Packet is not TCP!')
359        return False
360    chksum = tcp.chksum
361    tcp.chksum = None
362    newpacket = sp.Ether(sp.raw(packet[sp.Ether]))
363    new_chksum = newpacket[sp.TCP].chksum
364    if new_chksum and chksum != new_chksum:
365        LOGGER.debug(f'Wrong TCP checksum {chksum}, expected {new_chksum}!')
366        return False
367    if tcp_flags and tcp.flags != tcp_flags:
368        LOGGER.debug(f'Wrong TCP flags {tcp.flags}, expected {tcp_flags}!')
369        return False
370    if seq:
371        if tcp_flags == 'S':
372            tcp_seq = tcp.seq
373        elif tcp_flags == 'SA':
374            tcp_seq = tcp.ack - 1
375        if seq != tcp_seq:
376            LOGGER.debug(f'Wrong TCP Sequence Number {tcp_seq}, expected {seq}')
377            return False
378    if mss:
379        for option in tcp.options:
380            if option[0] == 'MSS':
381                if option[1] != mss:
382                    LOGGER.debug(f'Wrong TCP MSS {option[1]}, expected {mss}')
383                    return False
384    return True
385
386
387def check_udp(expect_params, packet):
388    expect_length = expect_params['length']
389    udp = packet.getlayer(sp.UDP)
390    if not udp:
391        LOGGER.debug('Packet is not UDP!')
392        return False
393    raw = packet.getlayer(sp.Raw)
394    if not raw:
395        LOGGER.debug('Packet contains no payload!')
396        return False
397    if raw.load != build_payload(expect_length):
398        LOGGER.debug(f'Payload magic does not match len {len(raw.load)} vs {expect_length}!')
399        return False
400    orig_chksum = udp.chksum
401    udp.chksum = None
402    newpacket = sp.Ether(sp.raw(packet[sp.Ether]))
403    new_chksum = newpacket[sp.UDP].chksum
404    if new_chksum and orig_chksum != new_chksum:
405        LOGGER.debug(f'Wrong UDP checksum {orig_chksum}, expected {new_chksum}!')
406        return False
407
408    return True
409
410
411def check_tcp_syn_request_4(expect_params, packet):
412    if not check_ipv4(expect_params, packet):
413        return False
414    if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet):
415        return False
416    return True
417
418
419def check_tcp_syn_reply_4(send_params, expect_params, packet):
420    if not check_ipv4(expect_params, packet):
421        return False
422    if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet):
423        return False
424    return True
425
426
427def check_tcp_3way_4(args, packet):
428    send_params = args['send_params']
429
430    expect_params_sa = clean_params(args['expect_params'])
431    expect_params_sa['src_address'] = send_params['dst_address']
432    expect_params_sa['dst_address'] = send_params['src_address']
433
434    # Sniff incoming SYN+ACK packet
435    if (
436        check_ipv4(expect_params_sa, packet) and
437        check_tcp(expect_params_sa | {'tcp_flags': 'SA'}, packet)
438    ):
439        ether = sp.Ether()
440        ip_sa = packet.getlayer(sp.IP)
441        tcp_sa = packet.getlayer(sp.TCP)
442        reply_params = clean_params(send_params)
443        reply_params['src_address'] = ip_sa.dst
444        reply_params['dst_address'] = ip_sa.src
445        ip_a = prepare_ipv4(reply_params)
446        tcp_a = sp.TCP(
447            sport=tcp_sa.dport, dport=tcp_sa.sport, flags='A',
448            seq=tcp_sa.ack, ack=tcp_sa.seq + 1,
449        )
450        req = ether / ip_a / tcp_a
451        sp.sendp(req, iface=send_params['sendif'], verbose=False)
452        return True
453
454    return False
455
456
457def check_udp_request_4(expect_params, packet):
458    if not check_ipv4(expect_params, packet):
459        return False
460    if not check_udp(expect_params, packet):
461        return False
462    return True
463
464
465def check_tcp_syn_request_6(expect_params, packet):
466    if not check_ipv6(expect_params, packet):
467        return False
468    if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet):
469        return False
470    return True
471
472
473def check_tcp_syn_reply_6(expect_params, packet):
474    if not check_ipv6(expect_params, packet):
475        return False
476    if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet):
477        return False
478    return True
479
480
481def check_tcp_3way_6(args, packet):
482    send_params = args['send_params']
483
484    expect_params_sa = clean_params(args['expect_params'])
485    expect_params_sa['src_address'] = send_params['dst_address']
486    expect_params_sa['dst_address'] = send_params['src_address']
487
488    # Sniff incoming SYN+ACK packet
489    if (
490        check_ipv6(expect_params_sa, packet) and
491        check_tcp(expect_params_sa | {'tcp_flags': 'SA'}, packet)
492    ):
493        ether = sp.Ether()
494        ip6_sa = packet.getlayer(sp.IPv6)
495        tcp_sa = packet.getlayer(sp.TCP)
496        reply_params = clean_params(send_params)
497        reply_params['src_address'] = ip6_sa.dst
498        reply_params['dst_address'] = ip6_sa.src
499        ip_a = prepare_ipv6(reply_params)
500        tcp_a = sp.TCP(
501            sport=tcp_sa.dport, dport=tcp_sa.sport, flags='A',
502            seq=tcp_sa.ack, ack=tcp_sa.seq + 1,
503        )
504        req = ether / ip_a / tcp_a
505        sp.sendp(req, iface=send_params['sendif'], verbose=False)
506        return True
507
508    return False
509
510
511def check_udp_request_6(expect_params, packet):
512    if not check_ipv6(expect_params, packet):
513        return False
514    if not check_udp(expect_params, packet):
515        return False
516    return True
517
518def check_tcp_syn_request(args, packet):
519    expect_params = args['expect_params']
520    src_address = expect_params.get('src_address')
521    dst_address = expect_params.get('dst_address')
522    if not (src_address or dst_address):
523        raise Exception('Source or destination address must be given to match the tcp syn request!')
524    if (
525        (src_address and ':' in src_address) or
526        (dst_address and ':' in dst_address)
527    ):
528        return check_tcp_syn_request_6(expect_params, packet)
529    else:
530        return check_tcp_syn_request_4(expect_params, packet)
531
532
533def check_tcp_syn_reply(args, packet):
534    expect_params = args['expect_params']
535    src_address = expect_params.get('src_address')
536    dst_address = expect_params.get('dst_address')
537    if not (src_address or dst_address):
538        raise Exception('Source or destination address must be given to match the tcp syn reply!')
539    if (
540        (src_address and ':' in src_address) or
541        (dst_address and ':' in dst_address)
542    ):
543        return check_tcp_syn_reply_6(expect_params, packet)
544    else:
545        return check_tcp_syn_reply_4(expect_params, packet)
546
547def check_tcp_3way(args, packet):
548    expect_params = args['expect_params']
549    src_address = expect_params.get('src_address')
550    dst_address = expect_params.get('dst_address')
551    if not (src_address or dst_address):
552        raise Exception('Source or destination address must be given to match the tcp syn reply!')
553    if (
554            (src_address and ':' in src_address) or
555            (dst_address and ':' in dst_address)
556    ):
557        return check_tcp_3way_6(args, packet)
558    else:
559        return check_tcp_3way_4(args, packet)
560
561
562def check_udp_request(args, packet):
563    expect_params = args['expect_params']
564    src_address = expect_params.get('src_address')
565    dst_address = expect_params.get('dst_address')
566    if not (src_address or dst_address):
567        raise Exception('Source or destination address must be given to match the tcp syn request!')
568    if (
569            (src_address and ':' in src_address) or
570            (dst_address and ':' in dst_address)
571    ):
572        return check_udp_request_6(expect_params, packet)
573    else:
574        return check_udp_request_4(expect_params, packet)
575
576
577def setup_sniffer(
578        recvif, ping_type, sniff_type, expect_params, defrag, send_params,
579):
580    if ping_type == 'icmp' and sniff_type == 'request':
581        checkfn = check_ping_request
582    elif ping_type == 'icmp' and sniff_type == 'reply':
583        checkfn = check_ping_reply
584    elif ping_type == 'tcpsyn' and sniff_type == 'request':
585        checkfn = check_tcp_syn_request
586    elif ping_type == 'tcpsyn' and sniff_type == 'reply':
587        checkfn = check_tcp_syn_reply
588    elif ping_type == 'tcp3way' and sniff_type == 'reply':
589        checkfn = check_tcp_3way
590    elif ping_type == 'udp' and sniff_type == 'request':
591        checkfn = check_udp_request
592    else:
593        raise Exception('Unspported ping and sniff type combination')
594
595    return Sniffer(
596        {'send_params': send_params, 'expect_params': expect_params},
597        checkfn, recvif, defrag=defrag,
598    )
599
600
601def parse_args():
602    parser = argparse.ArgumentParser("pft_ping.py",
603        description="Ping test tool")
604
605    # Parameters of sent ping request
606    parser.add_argument('--sendif', required=True,
607        help='The interface through which the packet(s) will be sent')
608    parser.add_argument('--to', required=True,
609        help='The destination IP address for the ping request')
610    parser.add_argument('--ping-type',
611        choices=('icmp', 'tcpsyn', 'tcp3way', 'udp'),
612        help='Type of ping: ICMP (default) or TCP SYN or 3-way TCP handshake',
613        default='icmp')
614    parser.add_argument('--fromaddr',
615        help='The source IP address for the ping request')
616
617    # Where to look for packets to analyze.
618    # The '+' format is ugly as it mixes positional with optional syntax.
619    # But we have no positional parameters so I guess it's fine to use it.
620    parser.add_argument('--recvif', nargs='+',
621        help='The interfaces on which to expect the ping request')
622    parser.add_argument('--replyif', nargs='+',
623        help='The interfaces which to expect the ping response')
624
625    # Packet settings
626    parser_send = parser.add_argument_group('Values set in transmitted packets')
627    parser_send.add_argument('--send-flags', type=str,
628        help='IPv4 fragmentation flags')
629    parser_send.add_argument('--send-frag-length', type=int,
630        help='Force IP fragmentation with given fragment length')
631    parser_send.add_argument('--send-hlim', type=int,
632        help='IPv6 Hop Limit or IPv4 Time To Live')
633    parser_send.add_argument('--send-mss', type=int,
634        help='TCP Maximum Segment Size')
635    parser_send.add_argument('--send-seq', type=int,
636        help='TCP sequence number')
637    parser_send.add_argument('--send-sport', type=int,
638        help='TCP source port')
639    parser_send.add_argument('--send-dport', type=int, default=9,
640        help='TCP destination port')
641    parser_send.add_argument('--send-length', type=int, default=len(PAYLOAD_MAGIC),
642        help='ICMP Echo Request payload size')
643    parser_send.add_argument('--send-tc', type=int,
644        help='IPv6 Traffic Class or IPv4 DiffServ / ToS')
645    parser_send.add_argument('--send-fl', type=int,
646        help='IPv6 Flow label')
647    parser_send.add_argument('--send-tcpopt-unaligned', action='store_true',
648        help='Include unaligned TCP options')
649    parser_send.add_argument('--send-nop', action='store_true',
650        help='Include a NOP IPv4 option')
651
652    # Expectations
653    parser_expect = parser.add_argument_group('Values expected in sniffed packets')
654    parser_expect.add_argument('--expect-flags', type=str,
655        help='IPv4 fragmentation flags')
656    parser_expect.add_argument('--expect-hlim', type=int,
657        help='IPv6 Hop Limit or IPv4 Time To Live')
658    parser_expect.add_argument('--expect-mss', type=int,
659        help='TCP Maximum Segment Size')
660    parser_send.add_argument('--expect-seq', type=int,
661        help='TCP sequence number')
662    parser_expect.add_argument('--expect-tc', type=int,
663        help='IPv6 Traffic Class or IPv4 DiffServ / ToS')
664    parser_expect.add_argument('--expect-fl', type=int,
665        help='IPv6 Flow Label')
666
667    parser.add_argument('-v', '--verbose', action='store_true',
668        help=('Enable verbose logging. Apart of potentially useful information '
669            'you might see warnings from parsing packets like NDP or other '
670            'packets not related to the test being run. Use only when '
671            'developing because real tests expect empty stderr and stdout.'))
672
673    return parser.parse_args()
674
675
676def main():
677    args = parse_args()
678
679    if args.verbose:
680        LOGGER.setLevel(logging.DEBUG)
681
682    # Split parameters into send and expect parameters. Parameters might be
683    # missing from the command line, always fill the dictionaries with None.
684    send_params = {}
685    expect_params = {}
686    for param_name in (
687        'flags', 'hlim', 'length', 'mss', 'seq', 'tc', 'fl', 'frag_length',
688        'sport', 'dport',
689    ):
690        param_arg = vars(args).get(f'send_{param_name}')
691        send_params[param_name] = param_arg if param_arg else None
692        param_arg = vars(args).get(f'expect_{param_name}')
693        expect_params[param_name] = param_arg if param_arg else None
694
695    expect_params['length'] = send_params['length']
696    send_params['tcpopt_unaligned'] = args.send_tcpopt_unaligned
697    send_params['nop'] = args.send_nop
698    send_params['src_address'] = args.fromaddr if args.fromaddr else None
699    send_params['dst_address'] = args.to
700    send_params['sendif'] = args.sendif
701
702    # We may not have a default route. Tell scapy where to start looking for routes
703    sp.conf.iface6 = args.sendif
704
705    # Configuration sanity checking.
706    if not (args.replyif or args.recvif):
707        raise Exception('With no reply or recv interface specified no traffic '
708            'can be sniffed and verified!'
709        )
710
711    sniffers = []
712
713    if send_params['frag_length']:
714        if (
715            (send_params['src_address'] and ':' in send_params['src_address']) or
716            (send_params['dst_address'] and ':' in send_params['dst_address'])
717        ):
718            defrag = 'IPv6'
719        else:
720            defrag = 'IPv4'
721    else:
722        defrag = False
723
724    if args.recvif:
725        sniffer_params = copy(expect_params)
726        sniffer_params['src_address'] = None
727        sniffer_params['dst_address'] = args.to
728        for iface in args.recvif:
729            LOGGER.debug(f'Installing receive sniffer on {iface}')
730            sniffers.append(
731                setup_sniffer(iface, args.ping_type, 'request',
732                              sniffer_params, defrag, send_params,
733            ))
734
735    if args.replyif:
736        sniffer_params = copy(expect_params)
737        sniffer_params['src_address'] = args.to
738        sniffer_params['dst_address'] = None
739        for iface in args.replyif:
740            LOGGER.debug(f'Installing reply sniffer on {iface}')
741            sniffers.append(
742                setup_sniffer(iface, args.ping_type, 'reply',
743                              sniffer_params, defrag, send_params,
744            ))
745
746    LOGGER.debug(f'Installed {len(sniffers)} sniffers')
747
748    send_ping(args.ping_type, send_params)
749
750    err = 0
751    sniffer_num = 0
752    for sniffer in sniffers:
753        sniffer.join()
754        if sniffer.correctPackets == 1:
755            LOGGER.debug(f'Expected ping has been sniffed on {sniffer._recvif}.')
756        else:
757            # Set a bit in err for each failed sniffer.
758            err |= 1<<sniffer_num
759            if sniffer.correctPackets > 1:
760                LOGGER.debug(f'Duplicated ping has been sniffed on {sniffer._recvif}!')
761            else:
762                LOGGER.debug(f'Expected ping has not been sniffed on {sniffer._recvif}!')
763        sniffer_num += 1
764
765    return err
766
767
768if __name__ == '__main__':
769    sys.exit(main())
770