xref: /freebsd/tests/sys/netpfil/common/pft_ping.py (revision 73b42eff2596a2bda45ce3ef7f505720ba0f2df0)
1#!/usr/bin/env python3
2#
3# SPDX-License-Identifier: BSD-2-Clause
4#
5# Copyright (c) 2017 Kristof Provost <kp@FreeBSD.org>
6# Copyright (c) 2023 Kajetan Staszkiewicz <vegeta@tuxpowered.net>
7#
8# Redistribution and use in source and binary forms, with or without
9# modification, are permitted provided that the following conditions
10# are met:
11# 1. Redistributions of source code must retain the above copyright
12#    notice, this list of conditions and the following disclaimer.
13# 2. Redistributions in binary form must reproduce the above copyright
14#    notice, this list of conditions and the following disclaimer in the
15#    documentation and/or other materials provided with the distribution.
16#
17# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27# SUCH DAMAGE.
28#
29
30import argparse
31import logging
32logging.getLogger("scapy").setLevel(logging.CRITICAL)
33import math
34import scapy.all as sp
35import sys
36import socket
37
38from copy import copy
39from sniffer import Sniffer
40
41logging.basicConfig(format='%(message)s')
42LOGGER = logging.getLogger(__name__)
43
44PAYLOAD_MAGIC = bytes.fromhex('42c0ffee')
45
46def build_payload(l):
47    pl = len(PAYLOAD_MAGIC)
48    ret = PAYLOAD_MAGIC * math.floor(l/pl)
49    ret += PAYLOAD_MAGIC[0:(l % pl)]
50    return ret
51
52
53def clean_params(params):
54    # Prepare a copy of safe copy of params
55    ret = copy(params)
56    ret.pop('src_address')
57    ret.pop('dst_address')
58    ret.pop('flags')
59    return ret
60
61def prepare_ipv6(send_params):
62    src_address = send_params.get('src_address')
63    dst_address = send_params.get('dst_address')
64    hlim = send_params.get('hlim')
65    tc = send_params.get('tc')
66    ip6 = sp.IPv6(dst=dst_address)
67    if src_address:
68        ip6.src = src_address
69    if hlim:
70        ip6.hlim = hlim
71    if tc:
72        ip6.tc = tc
73    return ip6
74
75
76def prepare_ipv4(send_params):
77    src_address = send_params.get('src_address')
78    dst_address = send_params.get('dst_address')
79    flags = send_params.get('flags')
80    tos = send_params.get('tc')
81    ttl = send_params.get('hlim')
82    opt = send_params.get('nop')
83    options = ''
84    if opt:
85        options='\x00'
86    ip = sp.IP(dst=dst_address, options=options)
87    if src_address:
88        ip.src = src_address
89    if flags:
90        ip.flags = flags
91    if tos:
92        ip.tos = tos
93    if ttl:
94        ip.ttl = ttl
95    return ip
96
97
98def send_icmp_ping(send_params):
99    send_length = send_params['length']
100    send_frag_length = send_params['frag_length']
101    packets = []
102    ether = sp.Ether()
103    if ':' in send_params['dst_address']:
104        ip6 = prepare_ipv6(send_params)
105        icmp = sp.ICMPv6EchoRequest(data=sp.raw(build_payload(send_length)))
106        if send_frag_length:
107            for packet in sp.fragment6(ip6 / icmp, fragSize=send_frag_length):
108                packets.append(ether / packet)
109        else:
110            packets.append(ether / ip6 / icmp)
111
112    else:
113        ip = prepare_ipv4(send_params)
114        icmp = sp.ICMP(type='echo-request')
115        raw = sp.raw(build_payload(send_length))
116        if send_frag_length:
117            for packet in sp.fragment(ip / icmp / raw, fragsize=send_frag_length):
118                packets.append(ether / packet)
119        else:
120            packets.append(ether / ip / icmp / raw)
121    for packet in packets:
122        sp.sendp(packet, iface=send_params['sendif'], verbose=False)
123
124
125def send_tcp_syn(send_params):
126    tcpopt_unaligned = send_params.get('tcpopt_unaligned')
127    seq = send_params.get('seq')
128    mss = send_params.get('mss')
129    ether = sp.Ether()
130    opts=[('Timestamp', (1, 1)), ('MSS', mss if mss else 1280)]
131    if tcpopt_unaligned:
132        opts = [('NOP', 0 )] + opts
133    if ':' in send_params['dst_address']:
134        ip = prepare_ipv6(send_params)
135    else:
136        ip = prepare_ipv4(send_params)
137    tcp = sp.TCP(
138        sport=send_params.get('sport'), dport=send_params.get('dport'),
139        flags='S', options=opts, seq=seq,
140    )
141    req = ether / ip / tcp
142    sp.sendp(req, iface=send_params['sendif'], verbose=False)
143
144
145def send_udp(send_params):
146    LOGGER.debug(f'Sending UDP ping')
147    packets = []
148    send_length = send_params['length']
149    send_frag_length = send_params['frag_length']
150    ether = sp.Ether()
151    if ':' in send_params['dst_address']:
152        ip6 = prepare_ipv6(send_params)
153        udp = sp.UDP(
154            sport=send_params.get('sport'), dport=send_params.get('dport'),
155        )
156        raw = sp.Raw(load=build_payload(send_length))
157        if send_frag_length:
158            for packet in sp.fragment6(ip6 / udp / raw, fragSize=send_frag_length):
159                packets.append(ether / packet)
160        else:
161            packets.append(ether / ip6 / udp / raw)
162    else:
163        ip = prepare_ipv4(send_params)
164        udp = sp.UDP(
165            sport=send_params.get('sport'), dport=send_params.get('dport'),
166        )
167        raw = sp.Raw(load=build_payload(send_length))
168        if send_frag_length:
169            for packet in sp.fragment(ip / udp / raw, fragsize=send_frag_length):
170                packets.append(ether / packet)
171        else:
172            packets.append(ether / ip / udp / raw)
173
174    for packet in packets:
175        sp.sendp(packet, iface=send_params['sendif'], verbose=False)
176
177
178def send_ping(ping_type, send_params):
179    if ping_type == 'icmp':
180        send_icmp_ping(send_params)
181    elif (
182        ping_type == 'tcpsyn' or
183        ping_type == 'tcp3way'
184    ):
185        send_tcp_syn(send_params)
186    elif ping_type == 'udp':
187        send_udp(send_params)
188    else:
189        raise Exception('Unsupported ping type')
190
191
192def check_ipv4(expect_params, packet):
193    src_address = expect_params.get('src_address')
194    dst_address = expect_params.get('dst_address')
195    flags = expect_params.get('flags')
196    tos = expect_params.get('tc')
197    ttl = expect_params.get('hlim')
198    ip = packet.getlayer(sp.IP)
199    LOGGER.debug(f'Packet: {ip}')
200    if not ip:
201        LOGGER.debug('Packet is not IPv4!')
202        return False
203    if src_address and ip.src != src_address:
204        LOGGER.debug(f'Wrong IPv4 source {ip.src}, expected {src_address}')
205        return False
206    if dst_address and ip.dst != dst_address:
207        LOGGER.debug(f'Wrong IPv4 destination {ip.dst}, expected {dst_address}')
208        return False
209    if flags and ip.flags != flags:
210        LOGGER.debug(f'Wrong IP flags value {ip.flags}, expected {flags}')
211        return False
212    if tos and ip.tos != tos:
213        LOGGER.debug(f'Wrong ToS value {ip.tos}, expected {tos}')
214        return False
215    if ttl and ip.ttl != ttl:
216        LOGGER.debug(f'Wrong TTL value {ip.ttl}, expected {ttl}')
217        return False
218    return True
219
220
221def check_ipv6(expect_params, packet):
222    src_address = expect_params.get('src_address')
223    dst_address = expect_params.get('dst_address')
224    flags = expect_params.get('flags')
225    hlim = expect_params.get('hlim')
226    tc = expect_params.get('tc')
227    ip6 = packet.getlayer(sp.IPv6)
228    if not ip6:
229        LOGGER.debug('Packet is not IPv6!')
230        return False
231    if src_address and socket.inet_pton(socket.AF_INET6, ip6.src) != \
232      socket.inet_pton(socket.AF_INET6, src_address):
233        LOGGER.debug(f'Wrong IPv6 source {ip6.src}, expected {src_address}')
234        return False
235    if dst_address and socket.inet_pton(socket.AF_INET6, ip6.dst) != \
236      socket.inet_pton(socket.AF_INET6, dst_address):
237        LOGGER.debug(f'Wrong IPv6 destination {ip6.dst}, expected {dst_address}')
238        return False
239    # IPv6 has no IP-level checksum.
240    if flags:
241        raise Exception("There's no fragmentation flags in IPv6")
242    if hlim and ip6.hlim != hlim:
243        LOGGER.debug(f'Wrong Hop Limit value {ip6.hlim}, expected {hlim}')
244        return False
245    if tc and ip6.tc != tc:
246        LOGGER.debug(f'Wrong TC value {ip6.tc}, expected {tc}')
247        return False
248    return True
249
250
251def check_ping_4(expect_params, packet):
252    expect_length = expect_params['length']
253    if not check_ipv4(expect_params, packet):
254        return False
255    icmp = packet.getlayer(sp.ICMP)
256    if not icmp:
257        LOGGER.debug('Packet is not IPv4 ICMP!')
258        return False
259    raw = packet.getlayer(sp.Raw)
260    if not raw:
261        LOGGER.debug('Packet contains no payload!')
262        return False
263    if raw.load != build_payload(expect_length):
264        LOGGER.debug('Payload magic does not match!')
265        return False
266    return True
267
268
269def check_ping_request_4(expect_params, packet):
270    if not check_ping_4(expect_params, packet):
271        return False
272    icmp = packet.getlayer(sp.ICMP)
273    if sp.icmptypes[icmp.type] != 'echo-request':
274        LOGGER.debug('Packet is not IPv4 ICMP Echo Request!')
275        return False
276    return True
277
278
279def check_ping_reply_4(expect_params, packet):
280    if not check_ping_4(expect_params, packet):
281        return False
282    icmp = packet.getlayer(sp.ICMP)
283    if sp.icmptypes[icmp.type] != 'echo-reply':
284        LOGGER.debug('Packet is not IPv4 ICMP Echo Reply!')
285        return False
286    return True
287
288
289def check_ping_request_6(expect_params, packet):
290    expect_length = expect_params['length']
291    if not check_ipv6(expect_params, packet):
292        return False
293    icmp = packet.getlayer(sp.ICMPv6EchoRequest)
294    if not icmp:
295        LOGGER.debug('Packet is not IPv6 ICMP Echo Request!')
296        return False
297    if icmp.data != build_payload(expect_length):
298        LOGGER.debug('Payload magic does not match!')
299        return False
300    return True
301
302
303def check_ping_reply_6(expect_params, packet):
304    expect_length = expect_params['length']
305    if not check_ipv6(expect_params, packet):
306        return False
307    icmp = packet.getlayer(sp.ICMPv6EchoReply)
308    if not icmp:
309        LOGGER.debug('Packet is not IPv6 ICMP Echo Reply!')
310        return False
311    if icmp.data != build_payload(expect_length):
312        LOGGER.debug('Payload magic does not match!')
313        return False
314    return True
315
316
317def check_ping_request(args, packet):
318    src_address = args['expect_params'].get('src_address')
319    dst_address = args['expect_params'].get('dst_address')
320    if not (src_address or dst_address):
321        raise Exception('Source or destination address must be given to match the ping request!')
322    if (
323        (src_address and ':' in src_address) or
324        (dst_address and ':' in dst_address)
325    ):
326        return check_ping_request_6(args['expect_params'], packet)
327    else:
328        return check_ping_request_4(args['expect_params'], packet)
329
330
331def check_ping_reply(args, packet):
332    src_address = args['expect_params'].get('src_address')
333    dst_address = args['expect_params'].get('dst_address')
334    if not (src_address or dst_address):
335        raise Exception('Source or destination address must be given to match the ping reply!')
336    if (
337        (src_address and ':' in src_address) or
338        (dst_address and ':' in dst_address)
339    ):
340        return check_ping_reply_6(args['expect_params'], packet)
341    else:
342        return check_ping_reply_4(args['expect_params'], packet)
343
344
345def check_tcp(expect_params, packet):
346    tcp_flags = expect_params.get('tcp_flags')
347    mss = expect_params.get('mss')
348    seq = expect_params.get('seq')
349    tcp = packet.getlayer(sp.TCP)
350    if not tcp:
351        LOGGER.debug('Packet is not TCP!')
352        return False
353    chksum = tcp.chksum
354    tcp.chksum = None
355    newpacket = sp.Ether(sp.raw(packet[sp.Ether]))
356    new_chksum = newpacket[sp.TCP].chksum
357    if new_chksum and chksum != new_chksum:
358        LOGGER.debug(f'Wrong TCP checksum {chksum}, expected {new_chksum}!')
359        return False
360    if tcp_flags and tcp.flags != tcp_flags:
361        LOGGER.debug(f'Wrong TCP flags {tcp.flags}, expected {tcp_flags}!')
362        return False
363    if seq:
364        if tcp_flags == 'S':
365            tcp_seq = tcp.seq
366        elif tcp_flags == 'SA':
367            tcp_seq = tcp.ack - 1
368        if seq != tcp_seq:
369            LOGGER.debug(f'Wrong TCP Sequence Number {tcp_seq}, expected {seq}')
370            return False
371    if mss:
372        for option in tcp.options:
373            if option[0] == 'MSS':
374                if option[1] != mss:
375                    LOGGER.debug(f'Wrong TCP MSS {option[1]}, expected {mss}')
376                    return False
377    return True
378
379
380def check_udp(expect_params, packet):
381    expect_length = expect_params['length']
382    udp = packet.getlayer(sp.UDP)
383    if not udp:
384        LOGGER.debug('Packet is not UDP!')
385        return False
386    raw = packet.getlayer(sp.Raw)
387    if not raw:
388        LOGGER.debug('Packet contains no payload!')
389        return False
390    if raw.load != build_payload(expect_length):
391        LOGGER.debug(f'Payload magic does not match len {len(raw.load)} vs {expect_length}!')
392        return False
393    orig_chksum = udp.chksum
394    udp.chksum = None
395    newpacket = sp.Ether(sp.raw(packet[sp.Ether]))
396    new_chksum = newpacket[sp.UDP].chksum
397    if new_chksum and orig_chksum != new_chksum:
398        LOGGER.debug(f'Wrong UDP checksum {orig_chksum}, expected {new_chksum}!')
399        return False
400
401    return True
402
403
404def check_tcp_syn_request_4(expect_params, packet):
405    if not check_ipv4(expect_params, packet):
406        return False
407    if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet):
408        return False
409    return True
410
411
412def check_tcp_syn_reply_4(send_params, expect_params, packet):
413    if not check_ipv4(expect_params, packet):
414        return False
415    if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet):
416        return False
417    return True
418
419
420def check_tcp_3way_4(args, packet):
421    send_params = args['send_params']
422
423    expect_params_sa = clean_params(args['expect_params'])
424    expect_params_sa['src_address'] = send_params['dst_address']
425    expect_params_sa['dst_address'] = send_params['src_address']
426
427    # Sniff incoming SYN+ACK packet
428    if (
429        check_ipv4(expect_params_sa, packet) and
430        check_tcp(expect_params_sa | {'tcp_flags': 'SA'}, packet)
431    ):
432        ether = sp.Ether()
433        ip_sa = packet.getlayer(sp.IP)
434        tcp_sa = packet.getlayer(sp.TCP)
435        reply_params = clean_params(send_params)
436        reply_params['src_address'] = ip_sa.dst
437        reply_params['dst_address'] = ip_sa.src
438        ip_a = prepare_ipv4(reply_params)
439        tcp_a = sp.TCP(
440            sport=tcp_sa.dport, dport=tcp_sa.sport, flags='A',
441            seq=tcp_sa.ack, ack=tcp_sa.seq + 1,
442        )
443        req = ether / ip_a / tcp_a
444        sp.sendp(req, iface=send_params['sendif'], verbose=False)
445        return True
446
447    return False
448
449
450def check_udp_request_4(expect_params, packet):
451    if not check_ipv4(expect_params, packet):
452        return False
453    if not check_udp(expect_params, packet):
454        return False
455    return True
456
457
458def check_tcp_syn_request_6(expect_params, packet):
459    if not check_ipv6(expect_params, packet):
460        return False
461    if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet):
462        return False
463    return True
464
465
466def check_tcp_syn_reply_6(expect_params, packet):
467    if not check_ipv6(expect_params, packet):
468        return False
469    if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet):
470        return False
471    return True
472
473
474def check_tcp_3way_6(args, packet):
475    send_params = args['send_params']
476
477    expect_params_sa = clean_params(args['expect_params'])
478    expect_params_sa['src_address'] = send_params['dst_address']
479    expect_params_sa['dst_address'] = send_params['src_address']
480
481    # Sniff incoming SYN+ACK packet
482    if (
483        check_ipv6(expect_params_sa, packet) and
484        check_tcp(expect_params_sa | {'tcp_flags': 'SA'}, packet)
485    ):
486        ether = sp.Ether()
487        ip6_sa = packet.getlayer(sp.IPv6)
488        tcp_sa = packet.getlayer(sp.TCP)
489        reply_params = clean_params(send_params)
490        reply_params['src_address'] = ip6_sa.dst
491        reply_params['dst_address'] = ip6_sa.src
492        ip_a = prepare_ipv6(reply_params)
493        tcp_a = sp.TCP(
494            sport=tcp_sa.dport, dport=tcp_sa.sport, flags='A',
495            seq=tcp_sa.ack, ack=tcp_sa.seq + 1,
496        )
497        req = ether / ip_a / tcp_a
498        sp.sendp(req, iface=send_params['sendif'], verbose=False)
499        return True
500
501    return False
502
503
504def check_udp_request_6(expect_params, packet):
505    if not check_ipv6(expect_params, packet):
506        return False
507    if not check_udp(expect_params, packet):
508        return False
509    return True
510
511def check_tcp_syn_request(args, packet):
512    expect_params = args['expect_params']
513    src_address = expect_params.get('src_address')
514    dst_address = expect_params.get('dst_address')
515    if not (src_address or dst_address):
516        raise Exception('Source or destination address must be given to match the tcp syn request!')
517    if (
518        (src_address and ':' in src_address) or
519        (dst_address and ':' in dst_address)
520    ):
521        return check_tcp_syn_request_6(expect_params, packet)
522    else:
523        return check_tcp_syn_request_4(expect_params, packet)
524
525
526def check_tcp_syn_reply(args, packet):
527    expect_params = args['expect_params']
528    src_address = expect_params.get('src_address')
529    dst_address = expect_params.get('dst_address')
530    if not (src_address or dst_address):
531        raise Exception('Source or destination address must be given to match the tcp syn reply!')
532    if (
533        (src_address and ':' in src_address) or
534        (dst_address and ':' in dst_address)
535    ):
536        return check_tcp_syn_reply_6(expect_params, packet)
537    else:
538        return check_tcp_syn_reply_4(expect_params, packet)
539
540def check_tcp_3way(args, packet):
541    expect_params = args['expect_params']
542    src_address = expect_params.get('src_address')
543    dst_address = expect_params.get('dst_address')
544    if not (src_address or dst_address):
545        raise Exception('Source or destination address must be given to match the tcp syn reply!')
546    if (
547            (src_address and ':' in src_address) or
548            (dst_address and ':' in dst_address)
549    ):
550        return check_tcp_3way_6(args, packet)
551    else:
552        return check_tcp_3way_4(args, packet)
553
554
555def check_udp_request(args, packet):
556    expect_params = args['expect_params']
557    src_address = expect_params.get('src_address')
558    dst_address = expect_params.get('dst_address')
559    if not (src_address or dst_address):
560        raise Exception('Source or destination address must be given to match the tcp syn request!')
561    if (
562            (src_address and ':' in src_address) or
563            (dst_address and ':' in dst_address)
564    ):
565        return check_udp_request_6(expect_params, packet)
566    else:
567        return check_udp_request_4(expect_params, packet)
568
569
570def setup_sniffer(
571        recvif, ping_type, sniff_type, expect_params, defrag, send_params,
572):
573    if ping_type == 'icmp' and sniff_type == 'request':
574        checkfn = check_ping_request
575    elif ping_type == 'icmp' and sniff_type == 'reply':
576        checkfn = check_ping_reply
577    elif ping_type == 'tcpsyn' and sniff_type == 'request':
578        checkfn = check_tcp_syn_request
579    elif ping_type == 'tcpsyn' and sniff_type == 'reply':
580        checkfn = check_tcp_syn_reply
581    elif ping_type == 'tcp3way' and sniff_type == 'reply':
582        checkfn = check_tcp_3way
583    elif ping_type == 'udp' and sniff_type == 'request':
584        checkfn = check_udp_request
585    else:
586        raise Exception('Unspported ping and sniff type combination')
587
588    return Sniffer(
589        {'send_params': send_params, 'expect_params': expect_params},
590        checkfn, recvif, defrag=defrag,
591    )
592
593
594def parse_args():
595    parser = argparse.ArgumentParser("pft_ping.py",
596        description="Ping test tool")
597
598    # Parameters of sent ping request
599    parser.add_argument('--sendif', required=True,
600        help='The interface through which the packet(s) will be sent')
601    parser.add_argument('--to', required=True,
602        help='The destination IP address for the ping request')
603    parser.add_argument('--ping-type',
604        choices=('icmp', 'tcpsyn', 'tcp3way', 'udp'),
605        help='Type of ping: ICMP (default) or TCP SYN or 3-way TCP handshake',
606        default='icmp')
607    parser.add_argument('--fromaddr',
608        help='The source IP address for the ping request')
609
610    # Where to look for packets to analyze.
611    # The '+' format is ugly as it mixes positional with optional syntax.
612    # But we have no positional parameters so I guess it's fine to use it.
613    parser.add_argument('--recvif', nargs='+',
614        help='The interfaces on which to expect the ping request')
615    parser.add_argument('--replyif', nargs='+',
616        help='The interfaces which to expect the ping response')
617
618    # Packet settings
619    parser_send = parser.add_argument_group('Values set in transmitted packets')
620    parser_send.add_argument('--send-flags', type=str,
621        help='IPv4 fragmentation flags')
622    parser_send.add_argument('--send-frag-length', type=int,
623        help='Force IP fragmentation with given fragment length')
624    parser_send.add_argument('--send-hlim', type=int,
625        help='IPv6 Hop Limit or IPv4 Time To Live')
626    parser_send.add_argument('--send-mss', type=int,
627        help='TCP Maximum Segment Size')
628    parser_send.add_argument('--send-seq', type=int,
629        help='TCP sequence number')
630    parser_send.add_argument('--send-sport', type=int,
631        help='TCP source port')
632    parser_send.add_argument('--send-dport', type=int, default=9,
633        help='TCP destination port')
634    parser_send.add_argument('--send-length', type=int, default=len(PAYLOAD_MAGIC),
635        help='ICMP Echo Request payload size')
636    parser_send.add_argument('--send-tc', type=int,
637        help='IPv6 Traffic Class or IPv4 DiffServ / ToS')
638    parser_send.add_argument('--send-tcpopt-unaligned', action='store_true',
639        help='Include unaligned TCP options')
640    parser_send.add_argument('--send-nop', action='store_true',
641        help='Include a NOP IPv4 option')
642
643    # Expectations
644    parser_expect = parser.add_argument_group('Values expected in sniffed packets')
645    parser_expect.add_argument('--expect-flags', type=str,
646        help='IPv4 fragmentation flags')
647    parser_expect.add_argument('--expect-hlim', type=int,
648        help='IPv6 Hop Limit or IPv4 Time To Live')
649    parser_expect.add_argument('--expect-mss', type=int,
650        help='TCP Maximum Segment Size')
651    parser_send.add_argument('--expect-seq', type=int,
652        help='TCP sequence number')
653    parser_expect.add_argument('--expect-tc', type=int,
654        help='IPv6 Traffic Class or IPv4 DiffServ / ToS')
655
656    parser.add_argument('-v', '--verbose', action='store_true',
657        help=('Enable verbose logging. Apart of potentially useful information '
658            'you might see warnings from parsing packets like NDP or other '
659            'packets not related to the test being run. Use only when '
660            'developing because real tests expect empty stderr and stdout.'))
661
662    return parser.parse_args()
663
664
665def main():
666    args = parse_args()
667
668    if args.verbose:
669        LOGGER.setLevel(logging.DEBUG)
670
671    # Split parameters into send and expect parameters. Parameters might be
672    # missing from the command line, always fill the dictionaries with None.
673    send_params = {}
674    expect_params = {}
675    for param_name in (
676        'flags', 'hlim', 'length', 'mss', 'seq', 'tc', 'frag_length',
677        'sport', 'dport',
678    ):
679        param_arg = vars(args).get(f'send_{param_name}')
680        send_params[param_name] = param_arg if param_arg else None
681        param_arg = vars(args).get(f'expect_{param_name}')
682        expect_params[param_name] = param_arg if param_arg else None
683
684    expect_params['length'] = send_params['length']
685    send_params['tcpopt_unaligned'] = args.send_tcpopt_unaligned
686    send_params['nop'] = args.send_nop
687    send_params['src_address'] = args.fromaddr if args.fromaddr else None
688    send_params['dst_address'] = args.to
689    send_params['sendif'] = args.sendif
690
691    # We may not have a default route. Tell scapy where to start looking for routes
692    sp.conf.iface6 = args.sendif
693
694    # Configuration sanity checking.
695    if not (args.replyif or args.recvif):
696        raise Exception('With no reply or recv interface specified no traffic '
697            'can be sniffed and verified!'
698        )
699
700    sniffers = []
701
702    if send_params['frag_length']:
703        if (
704            (send_params['src_address'] and ':' in send_params['src_address']) or
705            (send_params['dst_address'] and ':' in send_params['dst_address'])
706        ):
707            defrag = 'IPv6'
708        else:
709            defrag = 'IPv4'
710    else:
711        defrag = False
712
713    if args.recvif:
714        sniffer_params = copy(expect_params)
715        sniffer_params['src_address'] = None
716        sniffer_params['dst_address'] = args.to
717        for iface in args.recvif:
718            LOGGER.debug(f'Installing receive sniffer on {iface}')
719            sniffers.append(
720                setup_sniffer(iface, args.ping_type, 'request',
721                              sniffer_params, defrag, send_params,
722            ))
723
724    if args.replyif:
725        sniffer_params = copy(expect_params)
726        sniffer_params['src_address'] = args.to
727        sniffer_params['dst_address'] = None
728        for iface in args.replyif:
729            LOGGER.debug(f'Installing reply sniffer on {iface}')
730            sniffers.append(
731                setup_sniffer(iface, args.ping_type, 'reply',
732                              sniffer_params, defrag, send_params,
733            ))
734
735    LOGGER.debug(f'Installed {len(sniffers)} sniffers')
736
737    send_ping(args.ping_type, send_params)
738
739    err = 0
740    sniffer_num = 0
741    for sniffer in sniffers:
742        sniffer.join()
743        if sniffer.correctPackets == 1:
744            LOGGER.debug(f'Expected ping has been sniffed on {sniffer._recvif}.')
745        else:
746            # Set a bit in err for each failed sniffer.
747            err |= 1<<sniffer_num
748            if sniffer.correctPackets > 1:
749                LOGGER.debug(f'Duplicated ping has been sniffed on {sniffer._recvif}!')
750            else:
751                LOGGER.debug(f'Expected ping has not been sniffed on {sniffer._recvif}!')
752        sniffer_num += 1
753
754    return err
755
756
757if __name__ == '__main__':
758    sys.exit(main())
759