xref: /freebsd/tests/sys/netpfil/common/pft_ping.py (revision 21b492ed51aa6ff8008a8aa83333b1de30288a15)
1#!/usr/bin/env python3
2#
3# SPDX-License-Identifier: BSD-2-Clause
4#
5# Copyright (c) 2017 Kristof Provost <kp@FreeBSD.org>
6#
7# Redistribution and use in source and binary forms, with or without
8# modification, are permitted provided that the following conditions
9# are met:
10# 1. Redistributions of source code must retain the above copyright
11#    notice, this list of conditions and the following disclaimer.
12# 2. Redistributions in binary form must reproduce the above copyright
13#    notice, this list of conditions and the following disclaimer in the
14#    documentation and/or other materials provided with the distribution.
15#
16# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26# SUCH DAMAGE.
27#
28
29import argparse
30import logging
31logging.getLogger("scapy").setLevel(logging.CRITICAL)
32import scapy.all as sp
33import socket
34import sys
35from sniffer import Sniffer
36
37PAYLOAD_MAGIC = bytes.fromhex('42c0ffee')
38
39dup_found = 0
40
41def check_dup(args, packet):
42	"""
43	Verify that this is an ICMP packet, and that we only see one
44	"""
45	global dup_found
46
47	icmp = packet.getlayer(sp.ICMP)
48	if not icmp:
49		return False
50
51	raw = packet.getlayer(sp.Raw)
52	if not raw:
53		return False
54	if raw.load != PAYLOAD_MAGIC:
55		return False
56
57	dup_found = dup_found + 1
58	return False
59
60def check_ping_request(args, packet):
61	if args.ip6:
62		return check_ping6_request(args, packet)
63	else:
64		return check_ping4_request(args, packet)
65
66def check_ping4_request(args, packet):
67	"""
68	Verify that the packet matches what we'd have sent
69	"""
70	dst_ip = args.to[0]
71
72	ip = packet.getlayer(sp.IP)
73	if not ip:
74		return False
75	if ip.dst != dst_ip:
76		return False
77
78	icmp = packet.getlayer(sp.ICMP)
79	if not icmp:
80		return False
81	if sp.icmptypes[icmp.type] != 'echo-request':
82		return False
83
84	raw = packet.getlayer(sp.Raw)
85	if not raw:
86		return False
87	if raw.load != PAYLOAD_MAGIC:
88		return False
89
90	# Wait to check expectations until we've established this is the packet we
91	# sent.
92	if args.expect_tos:
93		if ip.tos != int(args.expect_tos[0]):
94			print("Unexpected ToS value %d, expected %d" \
95				% (ip.tos, int(args.expect_tos[0])))
96			return False
97
98	return True
99
100def check_ping6_request(args, packet):
101	"""
102	Verify that the packet matches what we'd have sent
103	"""
104	dst_ip = args.to[0]
105
106	ip = packet.getlayer(sp.IPv6)
107	if not ip:
108		return False
109	if ip.dst != dst_ip:
110		return False
111
112	icmp = packet.getlayer(sp.ICMPv6EchoRequest)
113	if not icmp:
114		return False
115	if icmp.data != PAYLOAD_MAGIC:
116		return False
117
118	# Wait to check expectations until we've established this is the packet we
119	# sent.
120	if args.expect_tc:
121		if ip.tc != int(args.expect_tc[0]):
122			print("Unexpected traffic class value %d, expected %d" \
123				% (ip.tc, int(args.expect_tc[0])))
124			return False
125
126	return True
127
128def check_ping_reply(args, packet):
129	if args.ip6:
130		return check_ping6_reply(args, packet)
131	else:
132		return check_ping4_reply(args, packet)
133
134def check_ping4_reply(args, packet):
135	"""
136	Check that this is a reply to the ping request we sent
137	"""
138	dst_ip = args.to[0]
139
140	ip = packet.getlayer(sp.IP)
141	if not ip:
142		return False
143	if ip.src != dst_ip:
144		return False
145
146	icmp = packet.getlayer(sp.ICMP)
147	if not icmp:
148		return False
149	if sp.icmptypes[icmp.type] != 'echo-reply':
150		return False
151
152	raw = packet.getlayer(sp.Raw)
153	if not raw:
154		return False
155	if raw.load != PAYLOAD_MAGIC:
156		return False
157
158	if args.expect_tos:
159		if ip.tos != int(args.expect_tos[0]):
160			print("Unexpected ToS value %d, expected %d" \
161				% (ip.tos, int(args.expect_tos[0])))
162			return False
163
164	return True
165
166def check_ping6_reply(args, packet):
167	"""
168	Check that this is a reply to the ping request we sent
169	"""
170	dst_ip = args.to[0]
171
172	ip = packet.getlayer(sp.IPv6)
173	if not ip:
174		return False
175	if ip.src != dst_ip:
176		return False
177
178	icmp = packet.getlayer(sp.ICMPv6EchoReply)
179	if not icmp:
180		print("No echo reply!")
181		return False
182
183	if icmp.data != PAYLOAD_MAGIC:
184		print("data mismatch")
185		return False
186
187	if args.expect_tc:
188		if ip.tc != int(args.expect_tc[0]):
189			print("Unexpected traffic class value %d, expected %d" \
190				% (ip.tc, int(args.expect_tc[0])))
191			return False
192
193	return True
194
195def ping(send_if, dst_ip, args):
196	ether = sp.Ether()
197	ip = sp.IP(dst=dst_ip)
198	icmp = sp.ICMP(type='echo-request')
199	raw = sp.raw(PAYLOAD_MAGIC)
200
201	if args.send_tos:
202		ip.tos = int(args.send_tos[0])
203
204	if args.fromaddr:
205		ip.src = args.fromaddr[0]
206
207	req = ether / ip / icmp / raw
208	sp.sendp(req, iface=send_if, verbose=False)
209
210def ping6(send_if, dst_ip, args):
211	ether = sp.Ether()
212	ip6 = sp.IPv6(dst=dst_ip)
213	icmp = sp.ICMPv6EchoRequest(data=sp.raw(PAYLOAD_MAGIC))
214
215	if args.send_tc:
216		ip6.tc = int(args.send_tc[0])
217
218	if args.fromaddr:
219		ip6.src = args.fromaddr[0]
220
221	req = ether / ip6 / icmp
222	sp.sendp(req, iface=send_if, verbose=False)
223
224def check_tcpsyn(args, packet):
225	dst_ip = args.to[0]
226
227	ip = packet.getlayer(sp.IP)
228	if not ip:
229		return False
230	if ip.dst != dst_ip:
231		return False
232
233	tcp = packet.getlayer(sp.TCP)
234	if not tcp:
235		return False
236
237	# Verify IP checksum
238	chksum = ip.chksum
239	ip.chksum = None
240	new_chksum = sp.IP(sp.raw(ip)).chksum
241	if chksum != new_chksum:
242		print("Expected IP checksum %x but found %x\n" % (new_cshkum, chksum))
243		return False
244
245	# Verify TCP checksum
246	chksum = tcp.chksum
247	packet_raw = sp.raw(packet)
248	tcp.chksum = None
249	newpacket = sp.Ether(sp.raw(packet[sp.Ether]))
250	new_chksum = newpacket[sp.TCP].chksum
251	if chksum != new_chksum:
252		print("Expected TCP checksum %x but found %x\n" % (new_chksum, chksum))
253		return False
254
255	return True
256
257def tcpsyn(send_if, dst_ip, args):
258	opts=[('Timestamp', (1, 1)), ('MSS', 1280)]
259
260	if args.tcpopt_unaligned:
261		opts = [('NOP', 0 )] + opts
262
263	ether = sp.Ether()
264	ip = sp.IP(dst=dst_ip)
265	tcp = sp.TCP(dport=666, flags='S', options=opts)
266
267	req = ether / ip / tcp
268	sp.sendp(req, iface=send_if, verbose=False)
269
270
271def main():
272	parser = argparse.ArgumentParser("pft_ping.py",
273		description="Ping test tool")
274	parser.add_argument('--sendif', nargs=1,
275		required=True,
276		help='The interface through which the packet(s) will be sent')
277	parser.add_argument('--recvif', nargs=1,
278		help='The interface on which to expect the ICMP echo request')
279	parser.add_argument('--replyif', nargs=1,
280		help='The interface on which to expect the ICMP echo response')
281	parser.add_argument('--checkdup', nargs=1,
282		help='The interface on which to expect the duplicated ICMP packets')
283	parser.add_argument('--ip6', action='store_true',
284		help='Use IPv6')
285	parser.add_argument('--to', nargs=1,
286		required=True,
287		help='The destination IP address for the ICMP echo request')
288	parser.add_argument('--fromaddr', nargs=1,
289		help='The source IP address for the ICMP echo request')
290
291	# TCP options
292	parser.add_argument('--tcpsyn', action='store_true',
293			help='Send a TCP SYN packet')
294	parser.add_argument('--tcpopt_unaligned', action='store_true',
295			help='Include unaligned TCP options')
296
297	# Packet settings
298	parser.add_argument('--send-tos', nargs=1,
299		help='Set the ToS value for the transmitted packet')
300	parser.add_argument('--send-tc', nargs=1,
301		help='Set the traffic class value for the transmitted packet')
302
303	# Expectations
304	parser.add_argument('--expect-tos', nargs=1,
305		help='The expected ToS value in the received packet')
306	parser.add_argument('--expect-tc', nargs=1,
307		help='The expected traffic class value in the received packet')
308
309	args = parser.parse_args()
310
311	# We may not have a default route. Tell scapy where to start looking for routes
312	sp.conf.iface6 = args.sendif[0]
313
314	sniffer = None
315	if not args.recvif is None:
316		checkfn=check_ping_request
317		if args.tcpsyn:
318			checkfn=check_tcpsyn
319
320		sniffer = Sniffer(args, checkfn)
321
322	replysniffer = None
323	if not args.replyif is None:
324		checkfn=check_ping_reply
325		replysniffer = Sniffer(args, checkfn, recvif=args.replyif[0])
326
327	dupsniffer = None
328	if args.checkdup is not None:
329		dupsniffer = Sniffer(args, check_dup, recvif=args.checkdup[0])
330
331	if args.tcpsyn:
332		tcpsyn(args.sendif[0], args.to[0], args)
333	else:
334		if args.ip6:
335			ping6(args.sendif[0], args.to[0], args)
336		else:
337			ping(args.sendif[0], args.to[0], args)
338
339	if dupsniffer:
340		dupsniffer.join()
341		if dup_found != 1:
342			sys.exit(1)
343
344	if sniffer:
345		sniffer.join()
346
347		if sniffer.foundCorrectPacket:
348			sys.exit(0)
349		else:
350			sys.exit(1)
351
352	if replysniffer:
353		replysniffer.join()
354
355		if replysniffer.foundCorrectPacket:
356			sys.exit(0)
357		else:
358			sys.exit(1)
359
360if __name__ == '__main__':
361	main()
362