xref: /freebsd/tests/sys/netinet6/frag6/sniffer.py (revision d0b2dbfa0ecf2bbc9709efc5e20baf8e4b44bbbf)
1
2import threading
3import logging
4logging.getLogger("scapy").setLevel(logging.CRITICAL)
5import scapy.all as sp
6
7class Sniffer(threading.Thread):
8	def __init__(self, args, check_function):
9		threading.Thread.__init__(self)
10
11		self._args = args
12		self._recvif = args.recvif[0]
13		self._check_function = check_function
14		self.foundCorrectPacket = False
15		self._endme = False
16
17		self.start()
18
19	def _checkPacket(self, packet):
20		ret = self._check_function(self._args, packet)
21		if ret:
22			self.foundCorrectPacket = True
23		return ret
24
25	def setEnd(self):
26		self._endme = True
27
28	def stopFilter(self, pkt):
29		if pkt is not None:
30			self._checkPacket(pkt)
31		if self.foundCorrectPacket or self._endme:
32			return True
33		else:
34			return False
35
36	def run(self):
37		while True:
38			self.packets = sp.sniff(iface=self._recvif, store=False,
39				stop_filter=self.stopFilter, timeout=90)
40			if self.stopFilter(None):
41				break
42