xref: /freebsd/tests/sys/netpfil/common/pft_ping.py (revision c66ec88fed842fbaad62c30d510644ceb7bd2d71)
1#!/usr/bin/env python
2#
3# SPDX-License-Identifier: BSD-2-Clause
4#
5# Copyright (c) 2017 Kristof Provost <kp@FreeBSD.org>
6#
7# Redistribution and use in source and binary forms, with or without
8# modification, are permitted provided that the following conditions
9# are met:
10# 1. Redistributions of source code must retain the above copyright
11#    notice, this list of conditions and the following disclaimer.
12# 2. Redistributions in binary form must reproduce the above copyright
13#    notice, this list of conditions and the following disclaimer in the
14#    documentation and/or other materials provided with the distribution.
15#
16# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26# SUCH DAMAGE.
27#
28
29import argparse
30import scapy.all as sp
31import socket
32import sys
33from sniffer import Sniffer
34
35PAYLOAD_MAGIC = bytes.fromhex('42c0ffee')
36
37def check_ping_request(args, packet):
38	if args.ip6:
39		return check_ping6_request(args, packet)
40	else:
41		return check_ping4_request(args, packet)
42
43def check_ping4_request(args, packet):
44	"""
45	Verify that the packet matches what we'd have sent
46	"""
47	dst_ip = args.to[0]
48
49	ip = packet.getlayer(sp.IP)
50	if not ip:
51		return False
52	if ip.dst != dst_ip:
53		return False
54
55	icmp = packet.getlayer(sp.ICMP)
56	if not icmp:
57		return False
58	if sp.icmptypes[icmp.type] != 'echo-request':
59		return False
60
61	raw = packet.getlayer(sp.Raw)
62	if not raw:
63		return False
64	if raw.load != PAYLOAD_MAGIC:
65		return False
66
67	# Wait to check expectations until we've established this is the packet we
68	# sent.
69	if args.expect_tos:
70		if ip.tos != int(args.expect_tos[0]):
71			print("Unexpected ToS value %d, expected %d" \
72				% (ip.tos, int(args.expect_tos[0])))
73			return False
74
75	return True
76
77def check_ping6_request(args, packet):
78	"""
79	Verify that the packet matches what we'd have sent
80	"""
81	dst_ip = args.to[0]
82
83	ip = packet.getlayer(sp.IPv6)
84	if not ip:
85		return False
86	if ip.dst != dst_ip:
87		return False
88
89	icmp = packet.getlayer(sp.ICMPv6EchoRequest)
90	if not icmp:
91		return False
92	if icmp.data != PAYLOAD_MAGIC:
93		return False
94
95	return True
96
97def ping(send_if, dst_ip, args):
98	ether = sp.Ether()
99	ip = sp.IP(dst=dst_ip)
100	icmp = sp.ICMP(type='echo-request')
101	raw = sp.raw(PAYLOAD_MAGIC)
102
103	if args.send_tos:
104		ip.tos = int(args.send_tos[0])
105
106	req = ether / ip / icmp / raw
107	sp.sendp(req, iface=send_if, verbose=False)
108
109def ping6(send_if, dst_ip, args):
110	ether = sp.Ether()
111	ip6 = sp.IPv6(dst=dst_ip)
112	icmp = sp.ICMPv6EchoRequest(data=sp.raw(PAYLOAD_MAGIC))
113
114	req = ether / ip6 / icmp
115	sp.sendp(req, iface=send_if, verbose=False)
116
117def check_tcpsyn(args, packet):
118	dst_ip = args.to[0]
119
120	ip = packet.getlayer(sp.IP)
121	if not ip:
122		return False
123	if ip.dst != dst_ip:
124		return False
125
126	tcp = packet.getlayer(sp.TCP)
127	if not tcp:
128		return False
129
130	# Verify IP checksum
131	chksum = ip.chksum
132	ip.chksum = None
133	new_chksum = sp.IP(sp.raw(ip)).chksum
134	if chksum != new_chksum:
135		print("Expected IP checksum %x but found %x\n" % (new_cshkum, chksum))
136		return False
137
138	# Verify TCP checksum
139	chksum = tcp.chksum
140	packet_raw = sp.raw(packet)
141	tcp.chksum = None
142	newpacket = sp.Ether(sp.raw(packet[sp.Ether]))
143	new_chksum = newpacket[sp.TCP].chksum
144	if chksum != new_chksum:
145		print("Expected TCP checksum %x but found %x\n" % (new_chksum, chksum))
146		return False
147
148	return True
149
150def tcpsyn(send_if, dst_ip, args):
151	opts=[('Timestamp', (1, 1)), ('MSS', 1280)]
152
153	if args.tcpopt_unaligned:
154		opts = [('NOP', 0 )] + opts
155
156	ether = sp.Ether()
157	ip = sp.IP(dst=dst_ip)
158	tcp = sp.TCP(dport=666, flags='S', options=opts)
159
160	req = ether / ip / tcp
161	sp.sendp(req, iface=send_if, verbose=False)
162
163
164def main():
165	parser = argparse.ArgumentParser("pft_ping.py",
166		description="Ping test tool")
167	parser.add_argument('--sendif', nargs=1,
168		required=True,
169		help='The interface through which the packet(s) will be sent')
170	parser.add_argument('--recvif', nargs=1,
171		help='The interface on which to expect the ICMP echo response')
172	parser.add_argument('--ip6', action='store_true',
173		help='Use IPv6')
174	parser.add_argument('--to', nargs=1,
175		required=True,
176		help='The destination IP address for the ICMP echo request')
177
178	# TCP options
179	parser.add_argument('--tcpsyn', action='store_true',
180			help='Send a TCP SYN packet')
181	parser.add_argument('--tcpopt_unaligned', action='store_true',
182			help='Include unaligned TCP options')
183
184	# Packet settings
185	parser.add_argument('--send-tos', nargs=1,
186		help='Set the ToS value for the transmitted packet')
187
188	# Expectations
189	parser.add_argument('--expect-tos', nargs=1,
190		help='The expected ToS value in the received packet')
191
192	args = parser.parse_args()
193
194	# We may not have a default route. Tell scapy where to start looking for routes
195	sp.conf.iface6 = args.sendif[0]
196
197	sniffer = None
198	if not args.recvif is None:
199		checkfn=check_ping_request
200		if args.tcpsyn:
201			checkfn=check_tcpsyn
202
203		sniffer = Sniffer(args, checkfn)
204
205	if args.tcpsyn:
206		tcpsyn(args.sendif[0], args.to[0], args)
207	else:
208		if args.ip6:
209			ping6(args.sendif[0], args.to[0], args)
210		else:
211			ping(args.sendif[0], args.to[0], args)
212
213	if sniffer:
214		sniffer.join()
215
216		if sniffer.foundCorrectPacket:
217			sys.exit(0)
218		else:
219			sys.exit(1)
220
221if __name__ == '__main__':
222	main()
223