xref: /freebsd/tests/sys/netinet6/frag6/sniffer.py (revision d0b2dbfa0ecf2bbc9709efc5e20baf8e4b44bbbf)
1f74e6e49SBjoern A. Zeeb
2f74e6e49SBjoern A. Zeebimport threading
3*a26e895fSKristof Provostimport logging
4*a26e895fSKristof Provostlogging.getLogger("scapy").setLevel(logging.CRITICAL)
5f74e6e49SBjoern A. Zeebimport scapy.all as sp
6f74e6e49SBjoern A. Zeeb
7f74e6e49SBjoern A. Zeebclass Sniffer(threading.Thread):
8f74e6e49SBjoern A. Zeeb	def __init__(self, args, check_function):
9f74e6e49SBjoern A. Zeeb		threading.Thread.__init__(self)
10f74e6e49SBjoern A. Zeeb
11f74e6e49SBjoern A. Zeeb		self._args = args
12f74e6e49SBjoern A. Zeeb		self._recvif = args.recvif[0]
13f74e6e49SBjoern A. Zeeb		self._check_function = check_function
14f74e6e49SBjoern A. Zeeb		self.foundCorrectPacket = False
15f74e6e49SBjoern A. Zeeb		self._endme = False
16f74e6e49SBjoern A. Zeeb
17f74e6e49SBjoern A. Zeeb		self.start()
18f74e6e49SBjoern A. Zeeb
19f74e6e49SBjoern A. Zeeb	def _checkPacket(self, packet):
20f74e6e49SBjoern A. Zeeb		ret = self._check_function(self._args, packet)
21f74e6e49SBjoern A. Zeeb		if ret:
22f74e6e49SBjoern A. Zeeb			self.foundCorrectPacket = True
23f74e6e49SBjoern A. Zeeb		return ret
24f74e6e49SBjoern A. Zeeb
25f74e6e49SBjoern A. Zeeb	def setEnd(self):
26f74e6e49SBjoern A. Zeeb		self._endme = True
27f74e6e49SBjoern A. Zeeb
28f74e6e49SBjoern A. Zeeb	def stopFilter(self, pkt):
29f74e6e49SBjoern A. Zeeb		if pkt is not None:
30f74e6e49SBjoern A. Zeeb			self._checkPacket(pkt)
31f74e6e49SBjoern A. Zeeb		if self.foundCorrectPacket or self._endme:
32f74e6e49SBjoern A. Zeeb			return True
33f74e6e49SBjoern A. Zeeb		else:
34f74e6e49SBjoern A. Zeeb			return False
35f74e6e49SBjoern A. Zeeb
36f74e6e49SBjoern A. Zeeb	def run(self):
37f74e6e49SBjoern A. Zeeb		while True:
38f74e6e49SBjoern A. Zeeb			self.packets = sp.sniff(iface=self._recvif, store=False,
39f74e6e49SBjoern A. Zeeb				stop_filter=self.stopFilter, timeout=90)
40f74e6e49SBjoern A. Zeeb			if self.stopFilter(None):
41f74e6e49SBjoern A. Zeeb				break
42