1#!/usr/bin/env python 2#- 3# SPDX-License-Identifier: BSD-2-Clause 4# 5# Copyright (c) 2020 Netflix, Inc. 6# 7# Redistribution and use in source and binary forms, with or without 8# modification, are permitted provided that the following conditions 9# are met: 10# 1. Redistributions of source code must retain the above copyright 11# notice, this list of conditions and the following disclaimer. 12# 2. Redistributions in binary form must reproduce the above copyright 13# notice, this list of conditions and the following disclaimer in the 14# documentation and/or other materials provided with the distribution. 15# 16# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26# SUCH DAMAGE. 27# 28# 29 30import socket 31import os 32import sys 33from subprocess import check_output 34from time import sleep 35 36V4HOST = '127.0.0.1' 37V6HOST = '::1' 38TCPPORT = 65432 39UNIXSOCK = '/tmp/testsock' 40TYPE = socket.SOCK_STREAM 41 42class GenericTest(object): 43 def __init__(self): 44 raise NotImplementedError("Subclass must override the __init__ method") 45 def setup(self, af, addr): 46 self.sockets = [] 47 self.ls = None 48 self.ls = socket.socket(af, TYPE) 49 self.ls.bind(addr) 50 self.ls.listen(2) 51 self.af = af 52 self.addr = addr 53 def doTest(self, cnt): 54 rv = 0 55 for i in range(0, cnt): 56 try: 57 s = socket.socket(self.af, TYPE) 58 s.connect(self.addr) 59 except: 60 continue 61 self.sockets.append(s) 62 rv += 1 63 return rv 64 def __del__(self): 65 for s in self.sockets: 66 s.close() 67 if self.ls is not None: 68 self.ls.close() 69 70class IPv4Test(GenericTest): 71 def __init__(self): 72 super(IPv4Test, self).setup(socket.AF_INET, (V4HOST, TCPPORT)) 73 74class IPv6Test(GenericTest): 75 def __init__(self): 76 super(IPv6Test, self).setup(socket.AF_INET6, (V6HOST, TCPPORT)) 77 78class UnixTest(GenericTest): 79 def __init__(self): 80 super(UnixTest, self).setup(socket.AF_UNIX, UNIXSOCK) 81 def __del__(self): 82 super(UnixTest, self).__del__() 83 os.remove(UNIXSOCK) 84 85class LogChecker(): 86 def __init__(self): 87 # Clear the dmesg buffer to prevent rotating causes issues 88 os.system('/sbin/dmesg -c > /dev/null') 89 # Figure out how big the dmesg buffer is. 90 self.dmesgOff = len(check_output("/sbin/dmesg")) 91 92 def checkForMsg(self, expected): 93 newOff = self.dmesgOff 94 for i in range(0, 3): 95 dmesg = check_output("/sbin/dmesg") 96 newOff = len(dmesg) 97 if newOff >= self.dmesgOff: 98 dmesg = dmesg[self.dmesgOff:] 99 for line in dmesg.splitlines(): 100 try: 101 if str(line).find(expected) >= 0: 102 self.dmesgOff = newOff 103 return True 104 except: 105 pass 106 sleep(0.5) 107 self.dmesgOff = newOff 108 return False 109 110def main(): 111 ip4 = IPv4Test() 112 ip6 = IPv6Test() 113 lcl = UnixTest() 114 lc = LogChecker() 115 failure = False 116 117 STDLOGMSG = "Listen queue overflow: 4 already in queue awaiting acceptance (1 occurrences)" 118 119 V4LOGMSG = "(%s:%d (proto 6)): %s" % (V4HOST, TCPPORT, STDLOGMSG) 120 ip4.doTest(5) 121 if not lc.checkForMsg(V4LOGMSG): 122 failure = True 123 sys.stderr.write("IPv4 log message not seen\n") 124 else: 125 ip4.doTest(1) 126 if lc.checkForMsg(V4LOGMSG): 127 failure = True 128 sys.stderr.write("Subsequent IPv4 log message not suppressed\n") 129 130 V6LOGMSG = "([%s]:%d (proto 6)): %s" % (V6HOST, TCPPORT, STDLOGMSG) 131 ip6.doTest(5) 132 if not lc.checkForMsg(V6LOGMSG): 133 failure = True 134 sys.stderr.write("IPv6 log message not seen\n") 135 else: 136 ip6.doTest(1) 137 if lc.checkForMsg(V6LOGMSG): 138 failure = True 139 sys.stderr.write("Subsequent IPv6 log message not suppressed\n") 140 141 UNIXLOGMSG = "(local:%s): %s" % (UNIXSOCK, STDLOGMSG) 142 lcl.doTest(5) 143 if not lc.checkForMsg(UNIXLOGMSG): 144 failure = True 145 sys.stderr.write("Unix socket log message not seen\n") 146 else: 147 lcl.doTest(1) 148 if lc.checkForMsg(UNIXLOGMSG): 149 failure = True 150 sys.stderr.write("Subsequent Unix socket log message not suppressed\n") 151 152 if failure: 153 sys.exit(1) 154 sys.exit(0) 155 156if __name__ == '__main__': 157 main() 158