xref: /freebsd/tests/sys/netpfil/common/pft_ping.py (revision 9207f9d206a4017001f01ca27d3d25a26c268a95)
1#!/usr/bin/env python3
2#
3# SPDX-License-Identifier: BSD-2-Clause
4#
5# Copyright (c) 2017 Kristof Provost <kp@FreeBSD.org>
6# Copyright (c) 2023 Kajetan Staszkiewicz <vegeta@tuxpowered.net>
7#
8# Redistribution and use in source and binary forms, with or without
9# modification, are permitted provided that the following conditions
10# are met:
11# 1. Redistributions of source code must retain the above copyright
12#    notice, this list of conditions and the following disclaimer.
13# 2. Redistributions in binary form must reproduce the above copyright
14#    notice, this list of conditions and the following disclaimer in the
15#    documentation and/or other materials provided with the distribution.
16#
17# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27# SUCH DAMAGE.
28#
29
30import argparse
31import logging
32logging.getLogger("scapy").setLevel(logging.CRITICAL)
33import math
34import scapy.all as sp
35import sys
36
37from copy import copy
38from sniffer import Sniffer
39
40logging.basicConfig(format='%(message)s')
41LOGGER = logging.getLogger(__name__)
42
43PAYLOAD_MAGIC = bytes.fromhex('42c0ffee')
44
45def build_payload(l):
46    pl = len(PAYLOAD_MAGIC)
47    ret = PAYLOAD_MAGIC * math.floor(l/pl)
48    ret += PAYLOAD_MAGIC[0:(l % pl)]
49    return ret
50
51
52def prepare_ipv6(dst_address, send_params):
53    src_address = send_params.get('src_address')
54    hlim = send_params.get('hlim')
55    tc = send_params.get('tc')
56    ip6 = sp.IPv6(dst=dst_address)
57    if src_address:
58        ip6.src = src_address
59    if hlim:
60        ip6.hlim = hlim
61    if tc:
62        ip6.tc = tc
63    return ip6
64
65
66def prepare_ipv4(dst_address, send_params):
67    src_address = send_params.get('src_address')
68    flags = send_params.get('flags')
69    tos = send_params.get('tc')
70    ttl = send_params.get('hlim')
71    opt = send_params.get('nop')
72    options = ''
73    if opt:
74        options='\x00'
75    ip = sp.IP(dst=dst_address, options=options)
76    if src_address:
77        ip.src = src_address
78    if flags:
79        ip.flags = flags
80    if tos:
81        ip.tos = tos
82    if ttl:
83        ip.ttl = ttl
84    return ip
85
86
87def send_icmp_ping(dst_address, sendif, send_params):
88    send_length = send_params['length']
89    send_frag_length = send_params['frag_length']
90    packets = []
91    ether = sp.Ether()
92    if ':' in dst_address:
93        ip6 = prepare_ipv6(dst_address, send_params)
94        icmp = sp.ICMPv6EchoRequest(data=sp.raw(build_payload(send_length)))
95        if send_frag_length:
96            for packet in sp.fragment(ip6 / icmp, fragsize=send_frag_length):
97                packets.append(ether / packet)
98        else:
99            packets.append(ether / ip6 / icmp)
100
101    else:
102        ip = prepare_ipv4(dst_address, send_params)
103        icmp = sp.ICMP(type='echo-request')
104        raw = sp.raw(build_payload(send_length))
105        if send_frag_length:
106            for packet in sp.fragment(ip / icmp / raw, fragsize=send_frag_length):
107                packets.append(ether / packet)
108        else:
109            packets.append(ether / ip / icmp / raw)
110    for packet in packets:
111        sp.sendp(packet, sendif, verbose=False)
112
113
114def send_tcp_syn(dst_address, sendif, send_params):
115    tcpopt_unaligned = send_params.get('tcpopt_unaligned')
116    seq = send_params.get('seq')
117    mss = send_params.get('mss')
118    ether = sp.Ether()
119    opts=[('Timestamp', (1, 1)), ('MSS', mss if mss else 1280)]
120    if tcpopt_unaligned:
121        opts = [('NOP', 0 )] + opts
122    if ':' in dst_address:
123        ip = prepare_ipv6(dst_address, send_params)
124    else:
125        ip = prepare_ipv4(dst_address, send_params)
126    tcp = sp.TCP(dport=666, flags='S', options=opts, seq=seq)
127    req = ether / ip / tcp
128    sp.sendp(req, iface=sendif, verbose=False)
129
130
131def send_ping(dst_address, sendif, ping_type, send_params):
132    if ping_type == 'icmp':
133        send_icmp_ping(dst_address, sendif, send_params)
134    elif ping_type == 'tcpsyn':
135        send_tcp_syn(dst_address, sendif, send_params)
136    else:
137        raise Exception('Unspported ping type')
138
139
140def check_ipv4(expect_params, packet):
141    src_address = expect_params.get('src_address')
142    dst_address = expect_params.get('dst_address')
143    flags = expect_params.get('flags')
144    tos = expect_params.get('tc')
145    ttl = expect_params.get('hlim')
146    ip = packet.getlayer(sp.IP)
147    if not ip:
148        LOGGER.debug('Packet is not IPv4!')
149        return False
150    if src_address and ip.src != src_address:
151        LOGGER.debug('Source IPv4 address does not match!')
152        return False
153    if dst_address and ip.dst != dst_address:
154        LOGGER.debug('Destination IPv4 address does not match!')
155        return False
156    chksum = ip.chksum
157    ip.chksum = None
158    new_chksum = sp.IP(sp.raw(ip)).chksum
159    if chksum != new_chksum:
160        LOGGER.debug(f'Expected IP checksum {new_chksum} but found {chksum}')
161        return False
162    if flags and ip.flags != flags:
163        LOGGER.debug(f'Wrong IP flags value {ip.flags}, expected {flags}')
164        return False
165    if tos and ip.tos != tos:
166        LOGGER.debug(f'Wrong ToS value {ip.tos}, expected {tos}')
167        return False
168    if ttl and ip.ttl != ttl:
169        LOGGER.debug(f'Wrong TTL value {ip.ttl}, expected {ttl}')
170        return False
171    return True
172
173
174def check_ipv6(expect_params, packet):
175    src_address = expect_params.get('src_address')
176    dst_address = expect_params.get('dst_address')
177    flags = expect_params.get('flags')
178    hlim = expect_params.get('hlim')
179    tc = expect_params.get('tc')
180    ip6 = packet.getlayer(sp.IPv6)
181    if not ip6:
182        LOGGER.debug('Packet is not IPv6!')
183        return False
184    if src_address and ip6.src != src_address:
185        LOGGER.debug('Source IPv6 address does not match!')
186        return False
187    if dst_address and ip6.dst != dst_address:
188        LOGGER.debug('Destination IPv6 address does not match!')
189        return False
190    # IPv6 has no IP-level checksum.
191    if flags:
192        raise Exception("There's no fragmentation flags in IPv6")
193    if hlim and ip6.hlim != hlim:
194        LOGGER.debug(f'Wrong Hop Limit value {ip6.hlim}, expected {hlim}')
195        return False
196    if tc and ip6.tc != tc:
197        LOGGER.debug(f'Wrong TC value {ip6.tc}, expected {tc}')
198        return False
199    return True
200
201
202def check_ping_4(expect_params, packet):
203    expect_length = expect_params['length']
204    if not check_ipv4(expect_params, packet):
205        return False
206    icmp = packet.getlayer(sp.ICMP)
207    if not icmp:
208        LOGGER.debug('Packet is not IPv4 ICMP!')
209        return False
210    raw = packet.getlayer(sp.Raw)
211    if not raw:
212        LOGGER.debug('Packet contains no payload!')
213        return False
214    if raw.load != build_payload(expect_length):
215        LOGGER.debug('Payload magic does not match!')
216        return False
217    return True
218
219
220def check_ping_request_4(expect_params, packet):
221    if not check_ping_4(expect_params, packet):
222        return False
223    icmp = packet.getlayer(sp.ICMP)
224    if sp.icmptypes[icmp.type] != 'echo-request':
225        LOGGER.debug('Packet is not IPv4 ICMP Echo Request!')
226        return False
227    return True
228
229
230def check_ping_reply_4(expect_params, packet):
231    if not check_ping_4(expect_params, packet):
232        return False
233    icmp = packet.getlayer(sp.ICMP)
234    if sp.icmptypes[icmp.type] != 'echo-reply':
235        LOGGER.debug('Packet is not IPv4 ICMP Echo Reply!')
236        return False
237    return True
238
239
240def check_ping_request_6(expect_params, packet):
241    expect_length = expect_params['length']
242    if not check_ipv6(expect_params, packet):
243        return False
244    icmp = packet.getlayer(sp.ICMPv6EchoRequest)
245    if not icmp:
246        LOGGER.debug('Packet is not IPv6 ICMP Echo Request!')
247        return False
248    if icmp.data != build_payload(expect_length):
249        LOGGER.debug('Payload magic does not match!')
250        return False
251    return True
252
253
254def check_ping_reply_6(expect_params, packet):
255    expect_length = expect_params['length']
256    if not check_ipv6(expect_params, packet):
257        return False
258    icmp = packet.getlayer(sp.ICMPv6EchoReply)
259    if not icmp:
260        LOGGER.debug('Packet is not IPv6 ICMP Echo Reply!')
261        return False
262    if icmp.data != build_payload(expect_length):
263        LOGGER.debug('Payload magic does not match!')
264        return False
265    return True
266
267
268def check_ping_request(expect_params, packet):
269    src_address = expect_params.get('src_address')
270    dst_address = expect_params.get('dst_address')
271    if not (src_address or dst_address):
272        raise Exception('Source or destination address must be given to match the ping request!')
273    if (
274        (src_address and ':' in src_address) or
275        (dst_address and ':' in dst_address)
276    ):
277        return check_ping_request_6(expect_params, packet)
278    else:
279        return check_ping_request_4(expect_params, packet)
280
281
282def check_ping_reply(expect_params, packet):
283    src_address = expect_params.get('src_address')
284    dst_address = expect_params.get('dst_address')
285    if not (src_address or dst_address):
286        raise Exception('Source or destination address must be given to match the ping reply!')
287    if (
288        (src_address and ':' in src_address) or
289        (dst_address and ':' in dst_address)
290    ):
291        return check_ping_reply_6(expect_params, packet)
292    else:
293        return check_ping_reply_4(expect_params, packet)
294
295
296def check_tcp(expect_params, packet):
297    tcp_flags = expect_params.get('tcp_flags')
298    mss = expect_params.get('mss')
299    seq = expect_params.get('seq')
300    tcp = packet.getlayer(sp.TCP)
301    if not tcp:
302        LOGGER.debug('Packet is not TCP!')
303        return False
304    chksum = tcp.chksum
305    tcp.chksum = None
306    newpacket = sp.Ether(sp.raw(packet[sp.Ether]))
307    new_chksum = newpacket[sp.TCP].chksum
308    if chksum != new_chksum:
309        LOGGER.debug(f'Wrong TCP checksum {chksum}, expected {new_chksum}!')
310        return False
311    if tcp_flags and tcp.flags != tcp_flags:
312        LOGGER.debug(f'Wrong TCP flags {tcp.flags}, expected {tcp_flags}!')
313        return False
314    if seq:
315        if tcp_flags == 'S':
316            tcp_seq = tcp.seq
317        elif tcp_flags == 'SA':
318            tcp_seq = tcp.ack - 1
319        if seq != tcp_seq:
320            LOGGER.debug(f'Wrong TCP Sequence Number {tcp_seq}, expected {seq}')
321            return False
322    if mss:
323        for option in tcp.options:
324            if option[0] == 'MSS':
325                if option[1] != mss:
326                    LOGGER.debug(f'Wrong TCP MSS {option[1]}, expected {mss}')
327                    return False
328    return True
329
330
331def check_tcp_syn_request_4(expect_params, packet):
332    if not check_ipv4(expect_params, packet):
333        return False
334    if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet):
335        return False
336    return True
337
338
339def check_tcp_syn_reply_4(expect_params, packet):
340    if not check_ipv4(expect_params, packet):
341        return False
342    if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet):
343        return False
344    return True
345
346
347def check_tcp_syn_request_6(expect_params, packet):
348    if not check_ipv6(expect_params, packet):
349        return False
350    if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet):
351        return False
352    return True
353
354
355def check_tcp_syn_reply_6(expect_params, packet):
356    if not check_ipv6(expect_params, packet):
357        return False
358    if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet):
359        return False
360    return True
361
362
363def check_tcp_syn_request(expect_params, packet):
364    src_address = expect_params.get('src_address')
365    dst_address = expect_params.get('dst_address')
366    if not (src_address or dst_address):
367        raise Exception('Source or destination address must be given to match the tcp syn request!')
368    if (
369        (src_address and ':' in src_address) or
370        (dst_address and ':' in dst_address)
371    ):
372        return check_tcp_syn_request_6(expect_params, packet)
373    else:
374        return check_tcp_syn_request_4(expect_params, packet)
375
376
377def check_tcp_syn_reply(expect_params, packet):
378    src_address = expect_params.get('src_address')
379    dst_address = expect_params.get('dst_address')
380    if not (src_address or dst_address):
381        raise Exception('Source or destination address must be given to match the tcp syn reply!')
382    if (
383        (src_address and ':' in src_address) or
384        (dst_address and ':' in dst_address)
385    ):
386        return check_tcp_syn_reply_6(expect_params, packet)
387    else:
388        return check_tcp_syn_reply_4(expect_params, packet)
389
390
391def setup_sniffer(recvif, ping_type, sniff_type, expect_params, defrag):
392    if ping_type == 'icmp' and sniff_type == 'request':
393        checkfn = check_ping_request
394    elif ping_type == 'icmp' and sniff_type == 'reply':
395        checkfn = check_ping_reply
396    elif ping_type == 'tcpsyn' and sniff_type == 'request':
397        checkfn = check_tcp_syn_request
398    elif ping_type == 'tcpsyn' and sniff_type == 'reply':
399        checkfn = check_tcp_syn_reply
400    else:
401        raise Exception('Unspported ping or sniff type')
402
403    return Sniffer(expect_params, checkfn, recvif, defrag=defrag)
404
405
406def parse_args():
407    parser = argparse.ArgumentParser("pft_ping.py",
408        description="Ping test tool")
409
410    # Parameters of sent ping request
411    parser.add_argument('--sendif', nargs=1,
412        required=True,
413        help='The interface through which the packet(s) will be sent')
414    parser.add_argument('--to', nargs=1,
415        required=True,
416        help='The destination IP address for the ping request')
417    parser.add_argument('--ping-type',
418        choices=('icmp', 'tcpsyn'),
419        help='Type of ping: ICMP (default) or TCP SYN',
420        default='icmp')
421    parser.add_argument('--fromaddr', nargs=1,
422        help='The source IP address for the ping request')
423
424    # Where to look for packets to analyze.
425    # The '+' format is ugly as it mixes positional with optional syntax.
426    # But we have no positional parameters so I guess it's fine to use it.
427    parser.add_argument('--recvif', nargs='+',
428        help='The interfaces on which to expect the ping request')
429    parser.add_argument('--replyif', nargs='+',
430        help='The interfaces which to expect the ping response')
431
432    # Packet settings
433    parser_send = parser.add_argument_group('Values set in transmitted packets')
434    parser_send.add_argument('--send-flags', nargs=1, type=str,
435        help='IPv4 fragmentation flags')
436    parser_send.add_argument('--send-frag-length', nargs=1, type=int,
437         help='Force IP fragmentation with given fragment length')
438    parser_send.add_argument('--send-hlim', nargs=1, type=int,
439        help='IPv6 Hop Limit or IPv4 Time To Live')
440    parser_send.add_argument('--send-mss', nargs=1, type=int,
441        help='TCP Maximum Segment Size')
442    parser_send.add_argument('--send-seq', nargs=1, type=int,
443        help='TCP sequence number')
444    parser_send.add_argument('--send-length', nargs=1, type=int,
445        default=[len(PAYLOAD_MAGIC)], help='ICMP Echo Request payload size')
446    parser_send.add_argument('--send-tc', nargs=1, type=int,
447        help='IPv6 Traffic Class or IPv4 DiffServ / ToS')
448    parser_send.add_argument('--send-tcpopt-unaligned', action='store_true',
449         help='Include unaligned TCP options')
450    parser_send.add_argument('--send-nop', action='store_true',
451         help='Include a NOP IPv4 option')
452
453    # Expectations
454    parser_expect = parser.add_argument_group('Values expected in sniffed packets')
455    parser_expect.add_argument('--expect-flags', nargs=1, type=str,
456        help='IPv4 fragmentation flags')
457    parser_expect.add_argument('--expect-hlim', nargs=1, type=int,
458        help='IPv6 Hop Limit or IPv4 Time To Live')
459    parser_expect.add_argument('--expect-mss', nargs=1, type=int,
460        help='TCP Maximum Segment Size')
461    parser_send.add_argument('--expect-seq', nargs=1, type=int,
462        help='TCP sequence number')
463    parser_expect.add_argument('--expect-tc', nargs=1, type=int,
464        help='IPv6 Traffic Class or IPv4 DiffServ / ToS')
465
466    parser.add_argument('-v', '--verbose', action='store_true',
467        help=('Enable verbose logging. Apart of potentially useful information '
468            'you might see warnings from parsing packets like NDP or other '
469            'packets not related to the test being run. Use only when '
470            'developing because real tests expect empty stderr and stdout.'))
471
472    return parser.parse_args()
473
474
475def main():
476    args = parse_args()
477
478    if args.verbose:
479        LOGGER.setLevel(logging.DEBUG)
480
481    # Dig out real values of program arguments
482    send_if = args.sendif[0]
483    reply_ifs = args.replyif
484    recv_ifs = args.recvif
485    dst_address = args.to[0]
486
487    # Standardize parameters which have nargs=1.
488    send_params = {}
489    expect_params = {}
490    for param_name in ('flags', 'hlim', 'length', 'mss', 'seq', 'tc', 'frag_length'):
491        param_arg = vars(args).get(f'send_{param_name}')
492        send_params[param_name] = param_arg[0] if param_arg else None
493        param_arg = vars(args).get(f'expect_{param_name}')
494        expect_params[param_name] = param_arg[0] if param_arg else None
495
496    expect_params['length'] = send_params['length']
497    send_params['tcpopt_unaligned'] = args.send_tcpopt_unaligned
498    send_params['nop'] = args.send_nop
499    send_params['src_address'] = args.fromaddr[0] if args.fromaddr else None
500
501    # We may not have a default route. Tell scapy where to start looking for routes
502    sp.conf.iface6 = send_if
503
504    # Configuration sanity checking.
505    if not (reply_ifs or recv_ifs):
506        raise Exception('With no reply or recv interface specified no traffic '
507            'can be sniffed and verified!'
508        )
509
510    sniffers = []
511
512    if send_params['frag_length']:
513        defrag = True
514    else:
515        defrag = False
516
517    if recv_ifs:
518        sniffer_params = copy(expect_params)
519        sniffer_params['src_address'] = None
520        sniffer_params['dst_address'] = dst_address
521        for iface in recv_ifs:
522            LOGGER.debug(f'Installing receive sniffer on {iface}')
523            sniffers.append(
524                setup_sniffer(iface, args.ping_type, 'request',
525                              sniffer_params, defrag,
526            ))
527
528    if reply_ifs:
529        sniffer_params = copy(expect_params)
530        sniffer_params['src_address'] = dst_address
531        sniffer_params['dst_address'] = None
532        for iface in reply_ifs:
533            LOGGER.debug(f'Installing reply sniffer on {iface}')
534            sniffers.append(
535                setup_sniffer(iface, args.ping_type, 'reply',
536                              sniffer_params, defrag,
537            ))
538
539    LOGGER.debug(f'Installed {len(sniffers)} sniffers')
540
541    send_ping(dst_address, send_if, args.ping_type, send_params)
542
543    err = 0
544    sniffer_num = 0
545    for sniffer in sniffers:
546        sniffer.join()
547        if sniffer.correctPackets == 1:
548            LOGGER.debug(f'Expected ping has been sniffed on {sniffer._recvif}.')
549        else:
550            # Set a bit in err for each failed sniffer.
551            err |= 1<<sniffer_num
552            if sniffer.correctPackets > 1:
553                LOGGER.debug(f'Duplicated ping has been sniffed on {sniffer._recvif}!')
554            else:
555                LOGGER.debug(f'Expected ping has not been sniffed on {sniffer._recvif}!')
556        sniffer_num += 1
557
558    return err
559
560
561if __name__ == '__main__':
562    sys.exit(main())
563