1#!/usr/bin/env python 2#- 3# SPDX-License-Identifier: BSD-2-Clause 4# 5# Copyright (c) 2019 Netflix, Inc. 6# 7# Redistribution and use in source and binary forms, with or without 8# modification, are permitted provided that the following conditions 9# are met: 10# 1. Redistributions of source code must retain the above copyright 11# notice, this list of conditions and the following disclaimer. 12# 2. Redistributions in binary form must reproduce the above copyright 13# notice, this list of conditions and the following disclaimer in the 14# documentation and/or other materials provided with the distribution. 15# 16# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26# SUCH DAMAGE. 27# 28# 29 30import argparse 31import logging 32logging.getLogger("scapy").setLevel(logging.CRITICAL) 33import scapy.all as sp 34import socket 35import sys 36import frag6.sniffer as Sniffer 37from time import sleep 38 39def check_icmp6_error_dst_unreach_noport(args, packet): 40 ip6 = packet.getlayer(sp.IPv6) 41 if not ip6: 42 return False 43 oip6 = sp.IPv6(src=args.src[0], dst=args.to[0]) 44 if ip6.dst != oip6.src: 45 return False 46 icmp6 = packet.getlayer(sp.ICMPv6DestUnreach) 47 if not icmp6: 48 return False 49 # ICMP6_DST_UNREACH_NOPORT 4 50 if icmp6.code != 4: 51 return False 52 # Should we check the payload as well? 53 # We are running in a very isolated environment and nothing else 54 # should trigger an ICMPv6 Dest Unreach / Port Unreach so leave it. 55 #icmp6.display() 56 return True 57 58def check_icmp6_error_paramprob_header(args, packet): 59 ip6 = packet.getlayer(sp.IPv6) 60 if not ip6: 61 return False 62 oip6 = sp.IPv6(src=args.src[0], dst=args.to[0]) 63 if ip6.dst != oip6.src: 64 return False 65 icmp6 = packet.getlayer(sp.ICMPv6ParamProblem) 66 if not icmp6: 67 return False 68 # ICMP6_PARAMPROB_HEADER 0 69 if icmp6.code != 0: 70 return False 71 # Should we check the payload as well? 72 # We are running in a very isolated environment and nothing else 73 # should trigger an ICMPv6 Param Prob so leave it. 74 #icmp6.display() 75 return True 76 77def check_tcp_rst(args, packet): 78 ip6 = packet.getlayer(sp.IPv6) 79 if not ip6: 80 return False 81 oip6 = sp.IPv6(src=args.src[0], dst=args.to[0]) 82 if ip6.dst != oip6.src: 83 return False 84 tcp = packet.getlayer(sp.TCP) 85 if not tcp: 86 return False 87 # Is TCP RST? 88 if tcp.flags & 0x04: 89 #tcp.display() 90 return True 91 return False 92 93def addExt(ext, h): 94 if h is None: 95 return ext 96 if ext is None: 97 ext = h 98 else: 99 ext = ext / h 100 return ext 101 102def getExtHdrs(args): 103 ext = None 104 105 # XXX-TODO Try to put them in an order which could make sense 106 # in real life packets and according to the RFCs. 107 if args.hbh: 108 hbh = sp.IPv6ExtHdrHopByHop(options = \ 109 sp.PadN(optdata="\x00\x00\x00\x00\x00\x00")) 110 ext = addExt(ext, hbh) 111 112 if args.rh: 113 rh = sp.IPv6ExtHdrRouting(type = 0) 114 ext = addExt(ext, rh) 115 116 if args.frag6: 117 frag6 = sp.IPv6ExtHdrFragment(offset=0, m=0, id=0x1234) 118 ext = addExt(ext, frag6) 119 120 if args.esp: 121 # XXX TODO 122 esp = None 123 ext = addExt(ext, esp) 124 125 if args.ah: 126 # XXX TODO 127 ah = None 128 ext = addExt(ext, ah) 129 130 if args.dest: 131 dest = sp.IPv6ExtHdrDestOpt(options = \ 132 sp.PadN(optdata="\x00\x00\x00\x00\x00\x00")) 133 ext = addExt(ext, dest) 134 135 if args.mobi: 136 # XXX TODO 137 mobi = None 138 ext = addExt(ext, mobi) 139 140 if args.hip: 141 # XXX TODO 142 hip = None 143 ext = addExt(ext, hip) 144 145 if args.shim6: 146 # XXX TODO 147 shim6 = None 148 ext = addExt(ext, shim6) 149 150 if args.proto253: 151 # XXX TODO 152 tft = None 153 ext = addExt(ext, tft) 154 155 if args.proto254: 156 # XXX TODO 157 tff = None 158 ext = addExt(ext, tff) 159 160 if args.hbhbad: 161 hbhbad = sp.IPv6ExtHdrHopByHop(options = \ 162 sp.PadN(optdata="\x00\x00\x00\x00\x00\x00")) 163 ext = addExt(ext, hbhbad) 164 165 return ext 166 167def main(): 168 parser = argparse.ArgumentParser("exthdr.py", 169 description="IPv6 extension header test tool") 170 parser.add_argument('--sendif', nargs=1, 171 required=True, 172 help='The interface through which the packet will be sent') 173 parser.add_argument('--recvif', nargs=1, 174 required=True, 175 help='The interface on which to check for the packet') 176 parser.add_argument('--src', nargs=1, 177 required=True, 178 help='The source IP address') 179 parser.add_argument('--to', nargs=1, 180 required=True, 181 help='The destination IP address') 182 parser.add_argument('--debug', 183 required=False, action='store_true', 184 help='Enable test debugging') 185 # Extension Headers 186 # See https://www.iana.org/assignments/ipv6-parameters/ipv6-parameters.xhtml 187 parser.add_argument('--hbh', 188 required=False, action='store_true', 189 help='Add IPv6 Hop-by-Hop Option') 190 parser.add_argument('--hbhbad', 191 required=False, action='store_true', 192 help='Add IPv6 Hop-by-Hop Option at an invalid position') 193 parser.add_argument('--rh', 194 required=False, action='store_true', 195 help='Add Routing Header for IPv6') 196 parser.add_argument('--frag6', 197 required=False, action='store_true', 198 help='Add Fragment Header for IPv6') 199 parser.add_argument('--esp', 200 required=False, action='store_true', 201 help='Add Encapsulating Security Payload') 202 parser.add_argument('--ah', 203 required=False, action='store_true', 204 help='Add Authentication Header') 205 parser.add_argument('--dest', 206 required=False, action='store_true', 207 help='Add Destination Options for IPv6') 208 parser.add_argument('--mobi', 209 required=False, action='store_true', 210 help='Add Mobility Header') 211 parser.add_argument('--hip', 212 required=False, action='store_true', 213 help='Add Host Identity Protocol') 214 parser.add_argument('--shim6', 215 required=False, action='store_true', 216 help='Add Shim6 Protocol') 217 parser.add_argument('--proto253', 218 required=False, action='store_true', 219 help='Use for experimentation and testing (253)') 220 parser.add_argument('--proto254', 221 required=False, action='store_true', 222 help='Use for experimentation and testing (254)') 223 224 args = parser.parse_args() 225 226 if args.hbhbad: 227 ok = 0 228 else: 229 ok = 1 230 231 ######################################################################## 232 # 233 # Send IPv6 packets with one or more extension headers (combinations 234 # mmight not always make sense depending what user tells us). 235 # We are trying to cover the basic loop and passing mbufs on 236 # and making sure m_pullup() works. 237 # Try for at least UDP and TCP upper layer payloads. 238 # 239 # Expectations: no panics 240 # We are not testing for any other outcome here. 241 # 242 data = "6" * 88 243 udp = sp.UDP(dport=3456, sport=6543) / data 244 tcp = sp.TCP(dport=4567, sport=7654) 245 ip6 = sp.Ether() / sp.IPv6(src=args.src[0], dst=args.to[0]) 246 for ulp in [ udp, tcp ]: 247 ext = getExtHdrs(args) 248 if ext is not None: 249 pkt = ip6 / ext / ulp 250 else: 251 pkt = ip6 / ulp 252 if args.debug : 253 pkt.display() 254 if not ok: 255 sc = check_icmp6_error_paramprob_header; 256 elif ulp == udp: 257 sc = check_icmp6_error_dst_unreach_noport; 258 elif ulp == tcp: 259 sc = check_tcp_rst; 260 else: 261 sys.exit(2) 262 # Start sniffing on recvif 263 sniffer = Sniffer.Sniffer(args, sc) 264 sp.sendp(pkt, iface=args.sendif[0], verbose=False) 265 sleep(0.10) 266 sniffer.setEnd() 267 sniffer.join() 268 if not sniffer.foundCorrectPacket: 269 sys.exit(not ok) 270 271 sys.exit(0) 272 273if __name__ == '__main__': 274 main() 275