1#!/usr/bin/env python 2# - 3# SPDX-License-Identifier: BSD-2-Clause 4# 5# Copyright (c) 2020 Alexander V. Chernikov 6# 7# Redistribution and use in source and binary forms, with or without 8# modification, are permitted provided that the following conditions 9# are met: 10# 1. Redistributions of source code must retain the above copyright 11# notice, this list of conditions and the following disclaimer. 12# 2. Redistributions in binary form must reproduce the above copyright 13# notice, this list of conditions and the following disclaimer in the 14# documentation and/or other materials provided with the distribution. 15# 16# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26# SUCH DAMAGE. 27# 28# $FreeBSD$ 29# 30 31 32from functools import partial 33import socket 34import scapy.all as sc 35import argparse 36import time 37 38 39def parse_args(): 40 parser = argparse.ArgumentParser(description='divert socket tester') 41 parser.add_argument('--dip', type=str, help='destination packet IP') 42 parser.add_argument('--sip', type=str, help='source packet IP') 43 parser.add_argument('--dmac', type=str, help='packet dst mac') 44 parser.add_argument('--smac', type=str, help='packet src mac') 45 parser.add_argument('--iface', type=str, help='interface to use') 46 parser.add_argument('--test_name', type=str, required=True, 47 help='test name to run') 48 return parser.parse_args() 49 50 51def send_packet(args, pkt): 52 sc.sendp(pkt, iface=args.iface, verbose=False) 53 54 55def is_icmp6_echo_request(pkt): 56 return pkt.type == 0x86DD and pkt.payload.nh == 58 and \ 57 pkt.payload.payload.type == 128 58 59 60def check_forwarded_ip_packet(orig_pkt, fwd_pkt): 61 """ 62 Checks that forwarded ICMP packet @fwd_ptk is the same as 63 @orig_pkt. Assumes router-on-the-stick forwarding behaviour: 64 * src/dst macs are swapped 65 * TTL is decremented 66 """ 67 # Check ether fields 68 assert orig_pkt.src == fwd_pkt.dst 69 assert orig_pkt.dst == fwd_pkt.src 70 assert len(orig_pkt) == len(fwd_pkt) 71 # Check IP 72 fwd_ip = fwd_pkt[sc.IP] 73 orig_ip = orig_pkt[sc.IP] 74 assert orig_ip.src == orig_ip.src 75 assert orig_ip.dst == fwd_ip.dst 76 assert orig_ip.ttl == fwd_ip.ttl + 1 77 # Check ICMP 78 fwd_icmp = fwd_ip[sc.ICMP] 79 orig_icmp = orig_ip[sc.ICMP] 80 assert bytes(orig_ip.payload) == bytes(fwd_ip.payload) 81 82 83def fwd_ip_icmp_fast(args): 84 """ 85 Sends ICMP packet via args.iface interface. 86 Receives and checks the forwarded packet. 87 Assumes forwarding router decrements TTL 88 """ 89 90 def filter_f(x): 91 return x.src == args.dmac and x.type == 0x0800 92 93 e = sc.Ether(src=args.smac, dst=args.dmac) 94 ip = sc.IP(src=args.sip, dst=args.dip) 95 icmp = sc.ICMP(type='echo-request') 96 pkt = e / ip / icmp 97 98 send_cb = partial(send_packet, args, pkt) 99 packets = sc.sniff(iface=args.iface, started_callback=send_cb, 100 stop_filter=filter_f, lfilter=filter_f, timeout=5) 101 assert len(packets) > 0 102 fwd_pkt = packets[-1] 103 try: 104 check_forwarded_ip_packet(pkt, fwd_pkt) 105 except Exception as e: 106 print('Original packet:') 107 pkt.show() 108 print('Forwarded packet:') 109 fwd_pkt.show() 110 for a_packet in packets: 111 a_packet.summary() 112 raise Exception from e 113 114 115def fwd_ip_icmp_slow(args): 116 """ 117 Sends ICMP packet via args.iface interface. 118 Forces slow path processing by introducing IP option. 119 Receives and checks the forwarded packet. 120 Assumes forwarding router decrements TTL 121 """ 122 123 def filter_f(x): 124 return x.src == args.dmac and x.type == 0x0800 125 126 e = sc.Ether(src=args.smac, dst=args.dmac) 127 # Add IP option to switch to 'normal' IP processing 128 stream_id = sc.IPOption_Stream_Id(security=0xFFFF) 129 ip = sc.IP(src=args.sip, dst=args.dip, 130 options=[sc.IPOption_Stream_Id(security=0xFFFF)]) 131 icmp = sc.ICMP(type='echo-request') 132 pkt = e / ip / icmp 133 134 send_cb = partial(send_packet, args, pkt) 135 packets = sc.sniff(iface=args.iface, started_callback=send_cb, 136 stop_filter=filter_f, lfilter=filter_f, timeout=5) 137 assert len(packets) > 0 138 check_forwarded_ip_packet(pkt, packets[-1]) 139 140 141def check_forwarded_ip6_packet(orig_pkt, fwd_pkt): 142 """ 143 Checks that forwarded ICMP packet @fwd_ptk is the same as 144 @orig_pkt. Assumes router-on-the-stick forwarding behaviour: 145 * src/dst macs are swapped 146 * TTL is decremented 147 """ 148 # Check ether fields 149 assert orig_pkt.src == fwd_pkt.dst 150 assert orig_pkt.dst == fwd_pkt.src 151 assert len(orig_pkt) == len(fwd_pkt) 152 # Check IP 153 fwd_ip = fwd_pkt[sc.IPv6] 154 orig_ip = orig_pkt[sc.IPv6] 155 assert orig_ip.src == orig_ip.src 156 assert orig_ip.dst == fwd_ip.dst 157 assert orig_ip.hlim == fwd_ip.hlim + 1 158 # Check ICMPv6 159 assert bytes(orig_ip.payload) == bytes(fwd_ip.payload) 160 161 162def fwd_ip6_icmp(args): 163 """ 164 Sends ICMPv6 packet via args.iface interface. 165 Receives and checks the forwarded packet. 166 Assumes forwarding router decrements TTL 167 """ 168 169 def filter_f(x): 170 return x.src == args.dmac and is_icmp6_echo_request(x) 171 172 e = sc.Ether(src=args.smac, dst=args.dmac) 173 ip = sc.IPv6(src=args.sip, dst=args.dip) 174 icmp = sc.ICMPv6EchoRequest() 175 pkt = e / ip / icmp 176 177 send_cb = partial(send_packet, args, pkt) 178 packets = sc.sniff(iface=args.iface, started_callback=send_cb, 179 stop_filter=filter_f, lfilter=filter_f, timeout=5) 180 assert len(packets) > 0 181 fwd_pkt = packets[-1] 182 try: 183 check_forwarded_ip6_packet(pkt, fwd_pkt) 184 except Exception as e: 185 print('Original packet:') 186 pkt.show() 187 print('Forwarded packet:') 188 fwd_pkt.show() 189 for idx, a_packet in enumerate(packets): 190 print('{}: {}'.format(idx, a_packet.summary())) 191 raise Exception from e 192 193 194def main(): 195 args = parse_args() 196 test_ptr = globals()[args.test_name] 197 test_ptr(args) 198 199 200if __name__ == '__main__': 201 main() 202