xref: /freebsd/tests/sys/netpfil/common/pft_ping.py (revision ac099daf6742ead81ea7ea86351a8ef4e783041b)
1#!/usr/bin/env python3
2#
3# SPDX-License-Identifier: BSD-2-Clause
4#
5# Copyright (c) 2017 Kristof Provost <kp@FreeBSD.org>
6#
7# Redistribution and use in source and binary forms, with or without
8# modification, are permitted provided that the following conditions
9# are met:
10# 1. Redistributions of source code must retain the above copyright
11#    notice, this list of conditions and the following disclaimer.
12# 2. Redistributions in binary form must reproduce the above copyright
13#    notice, this list of conditions and the following disclaimer in the
14#    documentation and/or other materials provided with the distribution.
15#
16# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26# SUCH DAMAGE.
27#
28
29import argparse
30import scapy.all as sp
31import socket
32import sys
33from sniffer import Sniffer
34
35PAYLOAD_MAGIC = bytes.fromhex('42c0ffee')
36
37dup_found = 0
38
39def check_dup(args, packet):
40	"""
41	Verify that this is an ICMP packet, and that we only see one
42	"""
43	global dup_found
44
45	icmp = packet.getlayer(sp.ICMP)
46	if not icmp:
47		return False
48
49	raw = packet.getlayer(sp.Raw)
50	if not raw:
51		return False
52	if raw.load != PAYLOAD_MAGIC:
53		return False
54
55	dup_found = dup_found + 1
56	return False
57
58def check_ping_request(args, packet):
59	if args.ip6:
60		return check_ping6_request(args, packet)
61	else:
62		return check_ping4_request(args, packet)
63
64def check_ping4_request(args, packet):
65	"""
66	Verify that the packet matches what we'd have sent
67	"""
68	dst_ip = args.to[0]
69
70	ip = packet.getlayer(sp.IP)
71	if not ip:
72		return False
73	if ip.dst != dst_ip:
74		return False
75
76	icmp = packet.getlayer(sp.ICMP)
77	if not icmp:
78		return False
79	if sp.icmptypes[icmp.type] != 'echo-request':
80		return False
81
82	raw = packet.getlayer(sp.Raw)
83	if not raw:
84		return False
85	if raw.load != PAYLOAD_MAGIC:
86		return False
87
88	# Wait to check expectations until we've established this is the packet we
89	# sent.
90	if args.expect_tos:
91		if ip.tos != int(args.expect_tos[0]):
92			print("Unexpected ToS value %d, expected %d" \
93				% (ip.tos, int(args.expect_tos[0])))
94			return False
95
96	return True
97
98def check_ping6_request(args, packet):
99	"""
100	Verify that the packet matches what we'd have sent
101	"""
102	dst_ip = args.to[0]
103
104	ip = packet.getlayer(sp.IPv6)
105	if not ip:
106		return False
107	if ip.dst != dst_ip:
108		return False
109
110	icmp = packet.getlayer(sp.ICMPv6EchoRequest)
111	if not icmp:
112		return False
113	if icmp.data != PAYLOAD_MAGIC:
114		return False
115
116	return True
117
118def check_ping_reply(args, packet):
119	if args.ip6:
120		return check_ping6_reply(args, packet)
121	else:
122		return check_ping4_reply(args, packet)
123
124def check_ping4_reply(args, packet):
125	"""
126	Check that this is a reply to the ping request we sent
127	"""
128	dst_ip = args.to[0]
129
130	ip = packet.getlayer(sp.IP)
131	if not ip:
132		return False
133	if ip.src != dst_ip:
134		return False
135
136	icmp = packet.getlayer(sp.ICMP)
137	if not icmp:
138		return False
139	if sp.icmptypes[icmp.type] != 'echo-reply':
140		return False
141
142	raw = packet.getlayer(sp.Raw)
143	if not raw:
144		return False
145	if raw.load != PAYLOAD_MAGIC:
146		return False
147
148	return True
149
150def check_ping6_reply(args, packet):
151	"""
152	Check that this is a reply to the ping request we sent
153	"""
154	dst_ip = args.to[0]
155
156	ip = packet.getlayer(sp.IPv6)
157	if not ip:
158		return False
159	if ip.src != dst_ip:
160		return False
161
162	icmp = packet.getlayer(sp.ICMPv6EchoReply)
163	if not icmp:
164		print("No echo reply!")
165		return False
166
167	if icmp.data != PAYLOAD_MAGIC:
168		print("data mismatch")
169		return False
170
171	return True
172
173def ping(send_if, dst_ip, args):
174	ether = sp.Ether()
175	ip = sp.IP(dst=dst_ip)
176	icmp = sp.ICMP(type='echo-request')
177	raw = sp.raw(PAYLOAD_MAGIC)
178
179	if args.send_tos:
180		ip.tos = int(args.send_tos[0])
181
182	if args.fromaddr:
183		ip.src = args.fromaddr[0]
184
185	req = ether / ip / icmp / raw
186	sp.sendp(req, iface=send_if, verbose=False)
187
188def ping6(send_if, dst_ip, args):
189	ether = sp.Ether()
190	ip6 = sp.IPv6(dst=dst_ip)
191	icmp = sp.ICMPv6EchoRequest(data=sp.raw(PAYLOAD_MAGIC))
192
193	if args.fromaddr:
194		ip.src = args.fromaddr[0]
195
196	req = ether / ip6 / icmp
197	sp.sendp(req, iface=send_if, verbose=False)
198
199def check_tcpsyn(args, packet):
200	dst_ip = args.to[0]
201
202	ip = packet.getlayer(sp.IP)
203	if not ip:
204		return False
205	if ip.dst != dst_ip:
206		return False
207
208	tcp = packet.getlayer(sp.TCP)
209	if not tcp:
210		return False
211
212	# Verify IP checksum
213	chksum = ip.chksum
214	ip.chksum = None
215	new_chksum = sp.IP(sp.raw(ip)).chksum
216	if chksum != new_chksum:
217		print("Expected IP checksum %x but found %x\n" % (new_cshkum, chksum))
218		return False
219
220	# Verify TCP checksum
221	chksum = tcp.chksum
222	packet_raw = sp.raw(packet)
223	tcp.chksum = None
224	newpacket = sp.Ether(sp.raw(packet[sp.Ether]))
225	new_chksum = newpacket[sp.TCP].chksum
226	if chksum != new_chksum:
227		print("Expected TCP checksum %x but found %x\n" % (new_chksum, chksum))
228		return False
229
230	return True
231
232def tcpsyn(send_if, dst_ip, args):
233	opts=[('Timestamp', (1, 1)), ('MSS', 1280)]
234
235	if args.tcpopt_unaligned:
236		opts = [('NOP', 0 )] + opts
237
238	ether = sp.Ether()
239	ip = sp.IP(dst=dst_ip)
240	tcp = sp.TCP(dport=666, flags='S', options=opts)
241
242	req = ether / ip / tcp
243	sp.sendp(req, iface=send_if, verbose=False)
244
245
246def main():
247	parser = argparse.ArgumentParser("pft_ping.py",
248		description="Ping test tool")
249	parser.add_argument('--sendif', nargs=1,
250		required=True,
251		help='The interface through which the packet(s) will be sent')
252	parser.add_argument('--recvif', nargs=1,
253		help='The interface on which to expect the ICMP echo request')
254	parser.add_argument('--replyif', nargs=1,
255		help='The interface on which to expect the ICMP echo response')
256	parser.add_argument('--checkdup', nargs=1,
257		help='The interface on which to expect the duplicated ICMP packets')
258	parser.add_argument('--ip6', action='store_true',
259		help='Use IPv6')
260	parser.add_argument('--to', nargs=1,
261		required=True,
262		help='The destination IP address for the ICMP echo request')
263	parser.add_argument('--fromaddr', nargs=1,
264		help='The source IP address for the ICMP echo request')
265
266	# TCP options
267	parser.add_argument('--tcpsyn', action='store_true',
268			help='Send a TCP SYN packet')
269	parser.add_argument('--tcpopt_unaligned', action='store_true',
270			help='Include unaligned TCP options')
271
272	# Packet settings
273	parser.add_argument('--send-tos', nargs=1,
274		help='Set the ToS value for the transmitted packet')
275
276	# Expectations
277	parser.add_argument('--expect-tos', nargs=1,
278		help='The expected ToS value in the received packet')
279
280	args = parser.parse_args()
281
282	# We may not have a default route. Tell scapy where to start looking for routes
283	sp.conf.iface6 = args.sendif[0]
284
285	sniffer = None
286	if not args.recvif is None:
287		checkfn=check_ping_request
288		if args.tcpsyn:
289			checkfn=check_tcpsyn
290
291		sniffer = Sniffer(args, checkfn)
292
293	replysniffer = None
294	if not args.replyif is None:
295		checkfn=check_ping_reply
296		replysniffer = Sniffer(args, checkfn, recvif=args.replyif[0])
297
298	dupsniffer = None
299	if args.checkdup is not None:
300		dupsniffer = Sniffer(args, check_dup, recvif=args.checkdup[0])
301
302	if args.tcpsyn:
303		tcpsyn(args.sendif[0], args.to[0], args)
304	else:
305		if args.ip6:
306			ping6(args.sendif[0], args.to[0], args)
307		else:
308			ping(args.sendif[0], args.to[0], args)
309
310	if dupsniffer:
311		dupsniffer.join()
312		if dup_found != 1:
313			sys.exit(1)
314
315	if sniffer:
316		sniffer.join()
317
318		if sniffer.foundCorrectPacket:
319			sys.exit(0)
320		else:
321			sys.exit(1)
322
323	if replysniffer:
324		replysniffer.join()
325
326		if replysniffer.foundCorrectPacket:
327			sys.exit(0)
328		else:
329			sys.exit(1)
330
331if __name__ == '__main__':
332	main()
333