1#!/usr/bin/env python 2#- 3# SPDX-License-Identifier: BSD-2-Clause 4# 5# Copyright (c) 2020 Netflix, Inc. 6# 7# Redistribution and use in source and binary forms, with or without 8# modification, are permitted provided that the following conditions 9# are met: 10# 1. Redistributions of source code must retain the above copyright 11# notice, this list of conditions and the following disclaimer. 12# 2. Redistributions in binary form must reproduce the above copyright 13# notice, this list of conditions and the following disclaimer in the 14# documentation and/or other materials provided with the distribution. 15# 16# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26# SUCH DAMAGE. 27# 28# $FreeBSD$ 29# 30 31import socket 32import os 33import sys 34from subprocess import check_output 35from time import sleep 36 37V4HOST = '127.0.0.1' 38V6HOST = '::1' 39TCPPORT = 65432 40UNIXSOCK = '/tmp/testsock' 41TYPE = socket.SOCK_STREAM 42 43class GenericTest(object): 44 def __init__(self): 45 raise NotImplementedError("Subclass must override the __init__ method") 46 def setup(self, af, addr): 47 self.sockets = [] 48 self.ls = None 49 self.ls = socket.socket(af, TYPE) 50 self.ls.bind(addr) 51 self.ls.listen(2) 52 self.af = af 53 self.addr = addr 54 def doTest(self, cnt): 55 rv = 0 56 for i in range(0, cnt): 57 try: 58 s = socket.socket(self.af, TYPE) 59 s.connect(self.addr) 60 except: 61 continue 62 self.sockets.append(s) 63 rv += 1 64 return rv 65 def __del__(self): 66 for s in self.sockets: 67 s.close() 68 if self.ls is not None: 69 self.ls.close() 70 71class IPv4Test(GenericTest): 72 def __init__(self): 73 super(IPv4Test, self).setup(socket.AF_INET, (V4HOST, TCPPORT)) 74 75class IPv6Test(GenericTest): 76 def __init__(self): 77 super(IPv6Test, self).setup(socket.AF_INET6, (V6HOST, TCPPORT)) 78 79class UnixTest(GenericTest): 80 def __init__(self): 81 super(UnixTest, self).setup(socket.AF_UNIX, UNIXSOCK) 82 def __del__(self): 83 super(UnixTest, self).__del__() 84 os.remove(UNIXSOCK) 85 86class LogChecker(): 87 def __init__(self): 88 # Clear the dmesg buffer to prevent rotating causes issues 89 os.system('/sbin/dmesg -c > /dev/null') 90 # Figure out how big the dmesg buffer is. 91 self.dmesgOff = len(check_output("/sbin/dmesg")) 92 93 def checkForMsg(self, expected): 94 newOff = self.dmesgOff 95 for i in range(0, 3): 96 dmesg = check_output("/sbin/dmesg") 97 newOff = len(dmesg) 98 if newOff >= self.dmesgOff: 99 dmesg = dmesg[self.dmesgOff:] 100 for line in dmesg.splitlines(): 101 try: 102 if str(line).find(expected) >= 0: 103 self.dmesgOff = newOff 104 return True 105 except: 106 pass 107 sleep(0.5) 108 self.dmesgOff = newOff 109 return False 110 111def main(): 112 ip4 = IPv4Test() 113 ip6 = IPv6Test() 114 lcl = UnixTest() 115 lc = LogChecker() 116 failure = False 117 118 STDLOGMSG = "Listen queue overflow: 4 already in queue awaiting acceptance (1 occurrences)" 119 120 V4LOGMSG = "(%s:%d (proto 6)): %s" % (V4HOST, TCPPORT, STDLOGMSG) 121 ip4.doTest(5) 122 if not lc.checkForMsg(V4LOGMSG): 123 failure = True 124 sys.stderr.write("IPv4 log message not seen\n") 125 else: 126 ip4.doTest(1) 127 if lc.checkForMsg(V4LOGMSG): 128 failure = True 129 sys.stderr.write("Subsequent IPv4 log message not suppressed\n") 130 131 V6LOGMSG = "([%s]:%d (proto 6)): %s" % (V6HOST, TCPPORT, STDLOGMSG) 132 ip6.doTest(5) 133 if not lc.checkForMsg(V6LOGMSG): 134 failure = True 135 sys.stderr.write("IPv6 log message not seen\n") 136 else: 137 ip6.doTest(1) 138 if lc.checkForMsg(V6LOGMSG): 139 failure = True 140 sys.stderr.write("Subsequent IPv6 log message not suppressed\n") 141 142 UNIXLOGMSG = "(local:%s): %s" % (UNIXSOCK, STDLOGMSG) 143 lcl.doTest(5) 144 if not lc.checkForMsg(UNIXLOGMSG): 145 failure = True 146 sys.stderr.write("Unix socket log message not seen\n") 147 else: 148 lcl.doTest(1) 149 if lc.checkForMsg(UNIXLOGMSG): 150 failure = True 151 sys.stderr.write("Subsequent Unix socket log message not suppressed\n") 152 153 if failure: 154 sys.exit(1) 155 sys.exit(0) 156 157if __name__ == '__main__': 158 main() 159