1#!/usr/bin/env python 2#- 3# SPDX-License-Identifier: BSD-2-Clause 4# 5# Copyright (c) 2019 Netflix, Inc. 6# 7# Redistribution and use in source and binary forms, with or without 8# modification, are permitted provided that the following conditions 9# are met: 10# 1. Redistributions of source code must retain the above copyright 11# notice, this list of conditions and the following disclaimer. 12# 2. Redistributions in binary form must reproduce the above copyright 13# notice, this list of conditions and the following disclaimer in the 14# documentation and/or other materials provided with the distribution. 15# 16# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26# SUCH DAMAGE. 27# 28# $FreeBSD$ 29# 30 31import argparse 32import logging 33logging.getLogger("scapy").setLevel(logging.CRITICAL) 34import scapy.all as sp 35import socket 36import sys 37import frag6.sniffer as Sniffer 38from time import sleep 39 40def check_icmp6_error_dst_unreach_noport(args, packet): 41 ip6 = packet.getlayer(sp.IPv6) 42 if not ip6: 43 return False 44 oip6 = sp.IPv6(src=args.src[0], dst=args.to[0]) 45 if ip6.dst != oip6.src: 46 return False 47 icmp6 = packet.getlayer(sp.ICMPv6DestUnreach) 48 if not icmp6: 49 return False 50 # ICMP6_DST_UNREACH_NOPORT 4 51 if icmp6.code != 4: 52 return False 53 # Should we check the payload as well? 54 # We are running in a very isolated environment and nothing else 55 # should trigger an ICMPv6 Dest Unreach / Port Unreach so leave it. 56 #icmp6.display() 57 return True 58 59def check_icmp6_error_paramprob_header(args, packet): 60 ip6 = packet.getlayer(sp.IPv6) 61 if not ip6: 62 return False 63 oip6 = sp.IPv6(src=args.src[0], dst=args.to[0]) 64 if ip6.dst != oip6.src: 65 return False 66 icmp6 = packet.getlayer(sp.ICMPv6ParamProblem) 67 if not icmp6: 68 return False 69 # ICMP6_PARAMPROB_HEADER 0 70 if icmp6.code != 0: 71 return False 72 # Should we check the payload as well? 73 # We are running in a very isolated environment and nothing else 74 # should trigger an ICMPv6 Param Prob so leave it. 75 #icmp6.display() 76 return True 77 78def check_tcp_rst(args, packet): 79 ip6 = packet.getlayer(sp.IPv6) 80 if not ip6: 81 return False 82 oip6 = sp.IPv6(src=args.src[0], dst=args.to[0]) 83 if ip6.dst != oip6.src: 84 return False 85 tcp = packet.getlayer(sp.TCP) 86 if not tcp: 87 return False 88 # Is TCP RST? 89 if tcp.flags & 0x04: 90 #tcp.display() 91 return True 92 return False 93 94def addExt(ext, h): 95 if h is None: 96 return ext 97 if ext is None: 98 ext = h 99 else: 100 ext = ext / h 101 return ext 102 103def getExtHdrs(args): 104 ext = None 105 106 # XXX-TODO Try to put them in an order which could make sense 107 # in real life packets and according to the RFCs. 108 if args.hbh: 109 hbh = sp.IPv6ExtHdrHopByHop(options = \ 110 sp.PadN(optdata="\x00\x00\x00\x00\x00\x00")) 111 ext = addExt(ext, hbh) 112 113 if args.rh: 114 rh = sp.IPv6ExtHdrRouting(type = 0) 115 ext = addExt(ext, rh) 116 117 if args.frag6: 118 frag6 = sp.IPv6ExtHdrFragment(offset=0, m=0, id=0x1234) 119 ext = addExt(ext, frag6) 120 121 if args.esp: 122 # XXX TODO 123 esp = None 124 ext = addExt(ext, esp) 125 126 if args.ah: 127 # XXX TODO 128 ah = None 129 ext = addExt(ext, ah) 130 131 if args.dest: 132 dest = sp.IPv6ExtHdrDestOpt(options = \ 133 sp.PadN(optdata="\x00\x00\x00\x00\x00\x00")) 134 ext = addExt(ext, dest) 135 136 if args.mobi: 137 # XXX TODO 138 mobi = None 139 ext = addExt(ext, mobi) 140 141 if args.hip: 142 # XXX TODO 143 hip = None 144 ext = addExt(ext, hip) 145 146 if args.shim6: 147 # XXX TODO 148 shim6 = None 149 ext = addExt(ext, shim6) 150 151 if args.proto253: 152 # XXX TODO 153 tft = None 154 ext = addExt(ext, tft) 155 156 if args.proto254: 157 # XXX TODO 158 tff = None 159 ext = addExt(ext, tff) 160 161 if args.hbhbad: 162 hbhbad = sp.IPv6ExtHdrHopByHop(options = \ 163 sp.PadN(optdata="\x00\x00\x00\x00\x00\x00")) 164 ext = addExt(ext, hbhbad) 165 166 return ext 167 168def main(): 169 parser = argparse.ArgumentParser("exthdr.py", 170 description="IPv6 extension header test tool") 171 parser.add_argument('--sendif', nargs=1, 172 required=True, 173 help='The interface through which the packet will be sent') 174 parser.add_argument('--recvif', nargs=1, 175 required=True, 176 help='The interface on which to check for the packet') 177 parser.add_argument('--src', nargs=1, 178 required=True, 179 help='The source IP address') 180 parser.add_argument('--to', nargs=1, 181 required=True, 182 help='The destination IP address') 183 parser.add_argument('--debug', 184 required=False, action='store_true', 185 help='Enable test debugging') 186 # Extension Headers 187 # See https://www.iana.org/assignments/ipv6-parameters/ipv6-parameters.xhtml 188 parser.add_argument('--hbh', 189 required=False, action='store_true', 190 help='Add IPv6 Hop-by-Hop Option') 191 parser.add_argument('--hbhbad', 192 required=False, action='store_true', 193 help='Add IPv6 Hop-by-Hop Option at an invalid position') 194 parser.add_argument('--rh', 195 required=False, action='store_true', 196 help='Add Routing Header for IPv6') 197 parser.add_argument('--frag6', 198 required=False, action='store_true', 199 help='Add Fragment Header for IPv6') 200 parser.add_argument('--esp', 201 required=False, action='store_true', 202 help='Add Encapsulating Security Payload') 203 parser.add_argument('--ah', 204 required=False, action='store_true', 205 help='Add Authentication Header') 206 parser.add_argument('--dest', 207 required=False, action='store_true', 208 help='Add Destination Options for IPv6') 209 parser.add_argument('--mobi', 210 required=False, action='store_true', 211 help='Add Mobility Header') 212 parser.add_argument('--hip', 213 required=False, action='store_true', 214 help='Add Host Identity Protocol') 215 parser.add_argument('--shim6', 216 required=False, action='store_true', 217 help='Add Shim6 Protocol') 218 parser.add_argument('--proto253', 219 required=False, action='store_true', 220 help='Use for experimentation and testing (253)') 221 parser.add_argument('--proto254', 222 required=False, action='store_true', 223 help='Use for experimentation and testing (254)') 224 225 args = parser.parse_args() 226 227 if args.hbhbad: 228 ok = 0 229 else: 230 ok = 1 231 232 ######################################################################## 233 # 234 # Send IPv6 packets with one or more extension headers (combinations 235 # mmight not always make sense depending what user tells us). 236 # We are trying to cover the basic loop and passing mbufs on 237 # and making sure m_pullup() works. 238 # Try for at least UDP and TCP upper layer payloads. 239 # 240 # Expectations: no panics 241 # We are not testing for any other outcome here. 242 # 243 data = "6" * 88 244 udp = sp.UDP(dport=3456, sport=6543) / data 245 tcp = sp.TCP(dport=4567, sport=7654) 246 ip6 = sp.Ether() / sp.IPv6(src=args.src[0], dst=args.to[0]) 247 for ulp in [ udp, tcp ]: 248 ext = getExtHdrs(args) 249 if ext is not None: 250 pkt = ip6 / ext / ulp 251 else: 252 pkt = ip6 / ulp 253 if args.debug : 254 pkt.display() 255 if not ok: 256 sc = check_icmp6_error_paramprob_header; 257 elif ulp == udp: 258 sc = check_icmp6_error_dst_unreach_noport; 259 elif ulp == tcp: 260 sc = check_tcp_rst; 261 else: 262 sys.exit(2) 263 # Start sniffing on recvif 264 sniffer = Sniffer.Sniffer(args, sc) 265 sp.sendp(pkt, iface=args.sendif[0], verbose=False) 266 sleep(0.10) 267 sniffer.setEnd() 268 sniffer.join() 269 if not sniffer.foundCorrectPacket: 270 sys.exit(not ok) 271 272 sys.exit(0) 273 274if __name__ == '__main__': 275 main() 276