1#!/usr/local/bin/python3 2import os 3import socket 4import time 5from ctypes import cdll 6from ctypes import get_errno 7from ctypes.util import find_library 8from typing import List 9from typing import Optional 10 11 12def run_cmd(cmd: str) -> str: 13 print("run: '{}'".format(cmd)) 14 return os.popen(cmd).read() 15 16 17class VnetInterface(object): 18 INTERFACES_FNAME = "created_interfaces.lst" 19 20 # defines from net/if_types.h 21 IFT_LOOP = 0x18 22 IFT_ETHER = 0x06 23 24 def __init__(self, iface_name: str): 25 self.name = iface_name 26 self.vnet_name = "" 27 self.jailed = False 28 if iface_name.startswith("lo"): 29 self.iftype = self.IFT_LOOP 30 else: 31 self.iftype = self.IFT_ETHER 32 33 @property 34 def ifindex(self): 35 return socket.if_nametoindex(self.name) 36 37 def set_vnet(self, vnet_name: str): 38 self.vnet_name = vnet_name 39 40 def set_jailed(self, jailed: bool): 41 self.jailed = jailed 42 43 def run_cmd(self, cmd): 44 if self.vnet_name and not self.jailed: 45 cmd = "jexec {} {}".format(self.vnet_name, cmd) 46 run_cmd(cmd) 47 48 @staticmethod 49 def file_append_line(line): 50 with open(VnetInterface.INTERFACES_FNAME, "a") as f: 51 f.write(line + "\n") 52 53 @classmethod 54 def create_iface(cls, iface_name: str): 55 name = run_cmd("/sbin/ifconfig {} create".format(iface_name)).rstrip() 56 if not name: 57 raise Exception("Unable to create iface {}".format(iface_name)) 58 cls.file_append_line(name) 59 if name.startswith("epair"): 60 cls.file_append_line(name[:-1] + "b") 61 return cls(name) 62 63 @staticmethod 64 def cleanup_ifaces(): 65 try: 66 with open(VnetInterface.INTERFACES_FNAME, "r") as f: 67 for line in f: 68 run_cmd("/sbin/ifconfig {} destroy".format(line.strip())) 69 os.unlink(VnetInterface.INTERFACES_FNAME) 70 except Exception: 71 pass 72 73 def setup_addr(self, addr: str): 74 if ":" in addr: 75 family = "inet6" 76 else: 77 family = "inet" 78 cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr) 79 self.run_cmd(cmd) 80 81 def delete_addr(self, addr: str): 82 if ":" in addr: 83 cmd = "/sbin/ifconfig {} inet6 {} delete".format(self.name, addr) 84 else: 85 cmd = "/sbin/ifconfig {} -alias {}".format(self.name, addr) 86 self.run_cmd(cmd) 87 88 def turn_up(self): 89 cmd = "/sbin/ifconfig {} up".format(self.name) 90 self.run_cmd(cmd) 91 92 def enable_ipv6(self): 93 cmd = "/usr/sbin/ndp -i {} -disabled".format(self.name) 94 self.run_cmd(cmd) 95 96 97class VnetInstance(object): 98 JAILS_FNAME = "created_jails.lst" 99 100 def __init__(self, vnet_name: str, jid: int, ifaces: List[VnetInterface]): 101 self.name = vnet_name 102 self.jid = jid 103 self.ifaces = ifaces 104 for iface in ifaces: 105 iface.set_vnet(vnet_name) 106 iface.set_jailed(True) 107 108 def run_vnet_cmd(self, cmd): 109 if self.vnet_name: 110 cmd = "jexec {} {}".format(self.vnet_name, cmd) 111 return run_cmd(cmd) 112 113 @staticmethod 114 def wait_interface(vnet_name: str, iface_name: str): 115 cmd = "jexec {} /sbin/ifconfig -l".format(vnet_name) 116 for i in range(50): 117 ifaces = run_cmd(cmd).strip().split(" ") 118 if iface_name in ifaces: 119 return True 120 time.sleep(0.1) 121 return False 122 123 @staticmethod 124 def file_append_line(line): 125 with open(VnetInstance.JAILS_FNAME, "a") as f: 126 f.write(line + "\n") 127 128 @staticmethod 129 def cleanup_vnets(): 130 try: 131 with open(VnetInstance.JAILS_FNAME) as f: 132 for line in f: 133 run_cmd("/usr/sbin/jail -r {}".format(line.strip())) 134 os.unlink(VnetInstance.JAILS_FNAME) 135 except Exception: 136 pass 137 138 @classmethod 139 def create_with_interfaces(cls, vnet_name: str, ifaces: List[VnetInterface]): 140 iface_cmds = " ".join(["vnet.interface={}".format(i.name) for i in ifaces]) 141 cmd = "/usr/sbin/jail -i -c name={} persist vnet {}".format( 142 vnet_name, iface_cmds 143 ) 144 jid_str = run_cmd(cmd) 145 jid = int(jid_str) 146 if jid <= 0: 147 raise Exception("Jail creation failed, output: {}".format(jid)) 148 cls.file_append_line(vnet_name) 149 150 for iface in ifaces: 151 if cls.wait_interface(vnet_name, iface.name): 152 continue 153 raise Exception( 154 "Interface {} has not appeared in vnet {}".format(iface.name, vnet_name) 155 ) 156 return cls(vnet_name, jid, ifaces) 157 158 @staticmethod 159 def attach_jid(jid: int): 160 _path: Optional[str] = find_library("c") 161 if _path is None: 162 raise Exception("libc not found") 163 path: str = _path 164 libc = cdll.LoadLibrary(path) 165 if libc.jail_attach(jid) != 0: 166 raise Exception("jail_attach() failed: errno {}".format(get_errno())) 167 168 def attach(self): 169 self.attach_jid(self.jid) 170 171 172class SingleVnetTestTemplate(object): 173 num_epairs = 1 174 IPV6_PREFIXES: List[str] = [] 175 IPV4_PREFIXES: List[str] = [] 176 177 def setup_method(self, method): 178 test_name = method.__name__ 179 vnet_name = "jail_{}".format(test_name) 180 ifaces = [] 181 for i in range(self.num_epairs): 182 ifaces.append(VnetInterface.create_iface("epair")) 183 self.vnet = VnetInstance.create_with_interfaces(vnet_name, ifaces) 184 self.vnet.attach() 185 for i, addr in enumerate(self.IPV6_PREFIXES): 186 if addr: 187 iface = self.vnet.ifaces[i] 188 iface.turn_up() 189 iface.enable_ipv6() 190 iface.setup_addr(addr) 191 for i, addr in enumerate(self.IPV4_PREFIXES): 192 if addr: 193 iface = self.vnet.ifaces[i] 194 iface.turn_up() 195 iface.setup_addr(addr) 196 197 def cleanup(self, nodeid: str): 198 print("==== vnet cleanup ===") 199 VnetInstance.cleanup_vnets() 200 VnetInterface.cleanup_ifaces() 201 202 def run_cmd(self, cmd: str) -> str: 203 return os.popen(cmd).read() 204