1#!/usr/bin/env python 2# - 3# SPDX-License-Identifier: BSD-2-Clause 4# 5# Copyright (c) 2020 Alexander V. Chernikov 6# 7# Redistribution and use in source and binary forms, with or without 8# modification, are permitted provided that the following conditions 9# are met: 10# 1. Redistributions of source code must retain the above copyright 11# notice, this list of conditions and the following disclaimer. 12# 2. Redistributions in binary form must reproduce the above copyright 13# notice, this list of conditions and the following disclaimer in the 14# documentation and/or other materials provided with the distribution. 15# 16# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26# SUCH DAMAGE. 27# 28# 29 30 31from functools import partial 32import socket 33import logging 34logging.getLogger("scapy").setLevel(logging.CRITICAL) 35import scapy.all as sc 36import argparse 37import time 38 39 40def parse_args(): 41 parser = argparse.ArgumentParser(description='divert socket tester') 42 parser.add_argument('--dip', type=str, help='destination packet IP') 43 parser.add_argument('--sip', type=str, help='source packet IP') 44 parser.add_argument('--dmac', type=str, help='packet dst mac') 45 parser.add_argument('--smac', type=str, help='packet src mac') 46 parser.add_argument('--iface', type=str, help='interface to use') 47 parser.add_argument('--test_name', type=str, required=True, 48 help='test name to run') 49 return parser.parse_args() 50 51 52def send_packet(args, pkt): 53 sc.sendp(pkt, iface=args.iface, verbose=False) 54 55 56def is_icmp6_echo_request(pkt): 57 return pkt.type == 0x86DD and pkt.payload.nh == 58 and \ 58 pkt.payload.payload.type == 128 59 60 61def check_forwarded_ip_packet(orig_pkt, fwd_pkt): 62 """ 63 Checks that forwarded ICMP packet @fwd_ptk is the same as 64 @orig_pkt. Assumes router-on-the-stick forwarding behaviour: 65 * src/dst macs are swapped 66 * TTL is decremented 67 """ 68 # Check ether fields 69 assert orig_pkt.src == fwd_pkt.dst 70 assert orig_pkt.dst == fwd_pkt.src 71 assert len(orig_pkt) == len(fwd_pkt) 72 # Check IP 73 fwd_ip = fwd_pkt[sc.IP] 74 orig_ip = orig_pkt[sc.IP] 75 assert orig_ip.src == orig_ip.src 76 assert orig_ip.dst == fwd_ip.dst 77 assert orig_ip.ttl == fwd_ip.ttl + 1 78 # Check ICMP 79 fwd_icmp = fwd_ip[sc.ICMP] 80 orig_icmp = orig_ip[sc.ICMP] 81 assert bytes(orig_ip.payload) == bytes(fwd_ip.payload) 82 83 84def fwd_ip_icmp_fast(args): 85 """ 86 Sends ICMP packet via args.iface interface. 87 Receives and checks the forwarded packet. 88 Assumes forwarding router decrements TTL 89 """ 90 91 def filter_f(x): 92 return x.src == args.dmac and x.type == 0x0800 93 94 e = sc.Ether(src=args.smac, dst=args.dmac) 95 ip = sc.IP(src=args.sip, dst=args.dip) 96 icmp = sc.ICMP(type='echo-request') 97 pkt = e / ip / icmp 98 99 send_cb = partial(send_packet, args, pkt) 100 packets = sc.sniff(iface=args.iface, started_callback=send_cb, 101 stop_filter=filter_f, lfilter=filter_f, timeout=5) 102 assert len(packets) > 0 103 fwd_pkt = packets[-1] 104 try: 105 check_forwarded_ip_packet(pkt, fwd_pkt) 106 except Exception as e: 107 print('Original packet:') 108 pkt.show() 109 print('Forwarded packet:') 110 fwd_pkt.show() 111 for a_packet in packets: 112 a_packet.summary() 113 raise Exception from e 114 115 116def fwd_ip_icmp_slow(args): 117 """ 118 Sends ICMP packet via args.iface interface. 119 Forces slow path processing by introducing IP option. 120 Receives and checks the forwarded packet. 121 Assumes forwarding router decrements TTL 122 """ 123 124 def filter_f(x): 125 return x.src == args.dmac and x.type == 0x0800 126 127 e = sc.Ether(src=args.smac, dst=args.dmac) 128 # Add IP option to switch to 'normal' IP processing 129 stream_id = sc.IPOption_Stream_Id(security=0xFFFF) 130 ip = sc.IP(src=args.sip, dst=args.dip, 131 options=[sc.IPOption_Stream_Id(security=0xFFFF)]) 132 icmp = sc.ICMP(type='echo-request') 133 pkt = e / ip / icmp 134 135 send_cb = partial(send_packet, args, pkt) 136 packets = sc.sniff(iface=args.iface, started_callback=send_cb, 137 stop_filter=filter_f, lfilter=filter_f, timeout=5) 138 assert len(packets) > 0 139 check_forwarded_ip_packet(pkt, packets[-1]) 140 141 142def check_forwarded_ip6_packet(orig_pkt, fwd_pkt): 143 """ 144 Checks that forwarded ICMP packet @fwd_ptk is the same as 145 @orig_pkt. Assumes router-on-the-stick forwarding behaviour: 146 * src/dst macs are swapped 147 * TTL is decremented 148 """ 149 # Check ether fields 150 assert orig_pkt.src == fwd_pkt.dst 151 assert orig_pkt.dst == fwd_pkt.src 152 assert len(orig_pkt) == len(fwd_pkt) 153 # Check IP 154 fwd_ip = fwd_pkt[sc.IPv6] 155 orig_ip = orig_pkt[sc.IPv6] 156 assert orig_ip.src == orig_ip.src 157 assert orig_ip.dst == fwd_ip.dst 158 assert orig_ip.hlim == fwd_ip.hlim + 1 159 # Check ICMPv6 160 assert bytes(orig_ip.payload) == bytes(fwd_ip.payload) 161 162 163def fwd_ip6_icmp(args): 164 """ 165 Sends ICMPv6 packet via args.iface interface. 166 Receives and checks the forwarded packet. 167 Assumes forwarding router decrements TTL 168 """ 169 170 def filter_f(x): 171 return x.src == args.dmac and is_icmp6_echo_request(x) 172 173 e = sc.Ether(src=args.smac, dst=args.dmac) 174 ip = sc.IPv6(src=args.sip, dst=args.dip) 175 icmp = sc.ICMPv6EchoRequest() 176 pkt = e / ip / icmp 177 178 send_cb = partial(send_packet, args, pkt) 179 packets = sc.sniff(iface=args.iface, started_callback=send_cb, 180 stop_filter=filter_f, lfilter=filter_f, timeout=5) 181 assert len(packets) > 0 182 fwd_pkt = packets[-1] 183 try: 184 check_forwarded_ip6_packet(pkt, fwd_pkt) 185 except Exception as e: 186 print('Original packet:') 187 pkt.show() 188 print('Forwarded packet:') 189 fwd_pkt.show() 190 for idx, a_packet in enumerate(packets): 191 print('{}: {}'.format(idx, a_packet.summary())) 192 raise Exception from e 193 194 195def main(): 196 args = parse_args() 197 test_ptr = globals()[args.test_name] 198 test_ptr(args) 199 200 201if __name__ == '__main__': 202 main() 203