1#!/usr/bin/env python 2#- 3# SPDX-License-Identifier: BSD-2-Clause 4# 5# Copyright (c) 2020 Netflix, Inc. 6# 7# Redistribution and use in source and binary forms, with or without 8# modification, are permitted provided that the following conditions 9# are met: 10# 1. Redistributions of source code must retain the above copyright 11# notice, this list of conditions and the following disclaimer. 12# 2. Redistributions in binary form must reproduce the above copyright 13# notice, this list of conditions and the following disclaimer in the 14# documentation and/or other materials provided with the distribution. 15# 16# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26# SUCH DAMAGE. 27# 28# $FreeBSD$ 29# 30 31import socket 32import os 33import sys 34from subprocess import check_output 35from time import sleep 36 37V4HOST = '127.0.0.1' 38V6HOST = '::1' 39TCPPORT = 65432 40UNIXSOCK = '/tmp/testsock' 41TYPE = socket.SOCK_STREAM 42 43class GenericTest(object): 44 def __init__(self): 45 raise NotImplementedError("Subclass must override the __init__ method") 46 def setup(self, af, addr): 47 self.sockets = [] 48 self.ls = None 49 self.ls = socket.socket(af, TYPE) 50 self.ls.bind(addr) 51 self.ls.listen(2) 52 self.af = af 53 self.addr = addr 54 def doTest(self, cnt): 55 rv = 0 56 for i in range(0, cnt): 57 try: 58 s = socket.socket(self.af, TYPE) 59 s.connect(self.addr) 60 except: 61 continue 62 self.sockets.append(s) 63 rv += 1 64 return rv 65 def __del__(self): 66 for s in self.sockets: 67 s.close() 68 if self.ls is not None: 69 self.ls.close() 70 71class IPv4Test(GenericTest): 72 def __init__(self): 73 super(IPv4Test, self).setup(socket.AF_INET, (V4HOST, TCPPORT)) 74 75class IPv6Test(GenericTest): 76 def __init__(self): 77 super(IPv6Test, self).setup(socket.AF_INET6, (V6HOST, TCPPORT)) 78 79class UnixTest(GenericTest): 80 def __init__(self): 81 super(UnixTest, self).setup(socket.AF_UNIX, UNIXSOCK) 82 def __del__(self): 83 super(UnixTest, self).__del__() 84 os.remove(UNIXSOCK) 85 86class LogChecker(): 87 def __init__(self): 88 # Figure out how big the dmesg buffer is. 89 self.dmesgOff = len(check_output("/sbin/dmesg")) 90 91 def checkForMsg(self, expected): 92 newOff = self.dmesgOff 93 for i in range(0, 3): 94 dmesg = check_output("/sbin/dmesg") 95 newOff = len(dmesg) 96 if newOff >= self.dmesgOff: 97 dmesg = dmesg[self.dmesgOff:] 98 for line in dmesg.splitlines(): 99 try: 100 if str(line).find(expected) >= 0: 101 self.dmesgOff = newOff 102 return True 103 except: 104 pass 105 sleep(0.5) 106 self.dmesgOff = newOff 107 return False 108 109def main(): 110 ip4 = IPv4Test() 111 ip6 = IPv6Test() 112 lcl = UnixTest() 113 lc = LogChecker() 114 failure = False 115 116 STDLOGMSG = "Listen queue overflow: 4 already in queue awaiting acceptance (1 occurrences)" 117 118 V4LOGMSG = "(%s:%d (proto 6)): %s" % (V4HOST, TCPPORT, STDLOGMSG) 119 ip4.doTest(5) 120 if not lc.checkForMsg(V4LOGMSG): 121 failure = True 122 sys.stderr.write("IPv4 log message not seen\n") 123 else: 124 ip4.doTest(1) 125 if lc.checkForMsg(V4LOGMSG): 126 failure = True 127 sys.stderr.write("Subsequent IPv4 log message not suppressed\n") 128 129 V6LOGMSG = "([%s]:%d (proto 6)): %s" % (V6HOST, TCPPORT, STDLOGMSG) 130 ip6.doTest(5) 131 if not lc.checkForMsg(V6LOGMSG): 132 failure = True 133 sys.stderr.write("IPv6 log message not seen\n") 134 else: 135 ip6.doTest(1) 136 if lc.checkForMsg(V6LOGMSG): 137 failure = True 138 sys.stderr.write("Subsequent IPv6 log message not suppressed\n") 139 140 UNIXLOGMSG = "(local:%s): %s" % (UNIXSOCK, STDLOGMSG) 141 lcl.doTest(5) 142 if not lc.checkForMsg(UNIXLOGMSG): 143 failure = True 144 sys.stderr.write("Unix socket log message not seen\n") 145 else: 146 lcl.doTest(1) 147 if lc.checkForMsg(UNIXLOGMSG): 148 failure = True 149 sys.stderr.write("Subsequent Unix socket log message not suppressed\n") 150 151 if failure: 152 sys.exit(1) 153 sys.exit(0) 154 155if __name__ == '__main__': 156 main() 157