1#!/usr/local/bin/python3 2import copy 3import ipaddress 4import re 5import os 6import socket 7import sys 8import time 9from multiprocessing import Pipe 10from multiprocessing import Process 11from typing import Dict 12from typing import List 13from typing import NamedTuple 14 15from atf_python.sys.net.tools import ToolsHelper 16from atf_python.utils import BaseTest 17from atf_python.utils import libc 18 19 20def run_cmd(cmd: str, verbose=True) -> str: 21 print("run: '{}'".format(cmd)) 22 return os.popen(cmd).read() 23 24 25def get_topology_id(test_id: str) -> str: 26 """ 27 Gets a unique topology id based on the pytest test_id. 28 "test_ip6_output.py::TestIP6Output::test_output6_pktinfo[ipandif]" -> 29 "TestIP6Output:test_output6_pktinfo[ipandif]" 30 """ 31 return ":".join(test_id.split("::")[-2:]) 32 33 34def convert_test_name(test_name: str) -> str: 35 """Convert test name to a string that can be used in the file/jail names""" 36 ret = "" 37 for char in test_name: 38 if char.isalnum() or char in ("_", "-", ":"): 39 ret += char 40 elif char in ("["): 41 ret += "_" 42 return ret 43 44 45class VnetInterface(object): 46 # defines from net/if_types.h 47 IFT_LOOP = 0x18 48 IFT_ETHER = 0x06 49 50 def __init__(self, iface_alias: str, iface_name: str): 51 self.name = iface_name 52 self.alias = iface_alias 53 self.vnet_name = "" 54 self.jailed = False 55 self.addr_map: Dict[str, Dict] = {"inet6": {}, "inet": {}} 56 self.prefixes4: List[List[str]] = [] 57 self.prefixes6: List[List[str]] = [] 58 if iface_name.startswith("lo"): 59 self.iftype = self.IFT_LOOP 60 else: 61 self.iftype = self.IFT_ETHER 62 63 @property 64 def ifindex(self): 65 return socket.if_nametoindex(self.name) 66 67 @property 68 def first_ipv6(self): 69 d = self.addr_map["inet6"] 70 return d[next(iter(d))] 71 72 @property 73 def first_ipv4(self): 74 d = self.addr_map["inet"] 75 return d[next(iter(d))] 76 77 def set_vnet(self, vnet_name: str): 78 self.vnet_name = vnet_name 79 80 def set_jailed(self, jailed: bool): 81 self.jailed = jailed 82 83 def run_cmd( 84 self, 85 cmd, 86 verbose=False, 87 ): 88 if self.vnet_name and not self.jailed: 89 cmd = "jexec {} {}".format(self.vnet_name, cmd) 90 return run_cmd(cmd, verbose) 91 92 @classmethod 93 def setup_loopback(cls, vnet_name: str): 94 lo = VnetInterface("", "lo0") 95 lo.set_vnet(vnet_name) 96 lo.setup_addr("127.0.0.1/8") 97 lo.turn_up() 98 99 @classmethod 100 def create_iface(cls, alias_name: str, iface_name: str) -> List["VnetInterface"]: 101 name = run_cmd("/sbin/ifconfig {} create".format(iface_name)).rstrip() 102 if not name: 103 raise Exception("Unable to create iface {}".format(iface_name)) 104 ret = [cls(alias_name, name)] 105 if name.startswith("epair"): 106 ret.append(cls(alias_name, name[:-1] + "b")) 107 return ret 108 109 def setup_addr(self, _addr: str): 110 addr = ipaddress.ip_interface(_addr) 111 if addr.version == 6: 112 family = "inet6" 113 cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr) 114 else: 115 family = "inet" 116 if self.addr_map[family]: 117 cmd = "/sbin/ifconfig {} alias {}".format(self.name, addr) 118 else: 119 cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr) 120 self.run_cmd(cmd) 121 self.addr_map[family][str(addr.ip)] = addr 122 123 def delete_addr(self, _addr: str): 124 addr = ipaddress.ip_address(_addr) 125 if addr.version == 6: 126 family = "inet6" 127 cmd = "/sbin/ifconfig {} inet6 {} delete".format(self.name, addr) 128 else: 129 family = "inet" 130 cmd = "/sbin/ifconfig {} -alias {}".format(self.name, addr) 131 self.run_cmd(cmd) 132 del self.addr_map[family][str(addr)] 133 134 def turn_up(self): 135 cmd = "/sbin/ifconfig {} up".format(self.name) 136 self.run_cmd(cmd) 137 138 def enable_ipv6(self): 139 cmd = "/usr/sbin/ndp -i {} -disabled".format(self.name) 140 self.run_cmd(cmd) 141 142 def has_tentative(self) -> bool: 143 """True if an interface has some addresses in tenative state""" 144 cmd = "/sbin/ifconfig {} inet6".format(self.name) 145 out = self.run_cmd(cmd, verbose=False) 146 for line in out.splitlines(): 147 if "tentative" in line: 148 return True 149 return False 150 151 152class IfaceFactory(object): 153 INTERFACES_FNAME = "created_ifaces.lst" 154 AUTODELETE_TYPES = ("epair", "lo", "tap", "tun") 155 156 def __init__(self): 157 self.file_name = self.INTERFACES_FNAME 158 159 def _register_iface(self, iface_name: str): 160 with open(self.file_name, "a") as f: 161 f.write(iface_name + "\n") 162 163 def _list_ifaces(self) -> List[str]: 164 ret: List[str] = [] 165 try: 166 with open(self.file_name, "r") as f: 167 for line in f: 168 ret.append(line.strip()) 169 except OSError: 170 pass 171 return ret 172 173 def create_iface(self, alias_name: str, iface_name: str) -> List[VnetInterface]: 174 ifaces = VnetInterface.create_iface(alias_name, iface_name) 175 for iface in ifaces: 176 if not self.is_autodeleted(iface.name): 177 self._register_iface(iface.name) 178 return ifaces 179 180 @staticmethod 181 def is_autodeleted(iface_name: str) -> bool: 182 iface_type = re.split(r"\d+", iface_name)[0] 183 return iface_type in IfaceFactory.AUTODELETE_TYPES 184 185 def cleanup_vnet_interfaces(self, vnet_name: str) -> List[str]: 186 """Destroys""" 187 ifaces_lst = ToolsHelper.get_output( 188 "/usr/sbin/jexec {} ifconfig -l".format(vnet_name) 189 ) 190 for iface_name in ifaces_lst.split(): 191 if not self.is_autodeleted(iface_name): 192 if iface_name not in self._list_ifaces(): 193 print("Skipping interface {}:{}".format(vnet_name, iface_name)) 194 continue 195 run_cmd( 196 "/usr/sbin/jexec {} ifconfig {} destroy".format(vnet_name, iface_name) 197 ) 198 199 def cleanup(self): 200 try: 201 os.unlink(self.INTERFACES_FNAME) 202 except OSError: 203 pass 204 205 206class VnetInstance(object): 207 def __init__( 208 self, vnet_alias: str, vnet_name: str, jid: int, ifaces: List[VnetInterface] 209 ): 210 self.name = vnet_name 211 self.alias = vnet_alias # reference in the test topology 212 self.jid = jid 213 self.ifaces = ifaces 214 self.iface_alias_map = {} # iface.alias: iface 215 self.iface_map = {} # iface.name: iface 216 for iface in ifaces: 217 iface.set_vnet(vnet_name) 218 iface.set_jailed(True) 219 self.iface_alias_map[iface.alias] = iface 220 self.iface_map[iface.name] = iface 221 self.need_dad = False # Disable duplicate address detection by default 222 self.attached = False 223 self.pipe = None 224 self.subprocess = None 225 226 def run_vnet_cmd(self, cmd): 227 if not self.attached: 228 cmd = "jexec {} {}".format(self.name, cmd) 229 return run_cmd(cmd) 230 231 def disable_dad(self): 232 self.run_vnet_cmd("/sbin/sysctl net.inet6.ip6.dad_count=0") 233 234 def set_pipe(self, pipe): 235 self.pipe = pipe 236 237 def set_subprocess(self, p): 238 self.subprocess = p 239 240 @staticmethod 241 def attach_jid(jid: int): 242 error_code = libc.jail_attach(jid) 243 if error_code != 0: 244 raise Exception("jail_attach() failed: errno {}".format(error_code)) 245 246 def attach(self): 247 self.attach_jid(self.jid) 248 self.attached = True 249 250 251class VnetFactory(object): 252 JAILS_FNAME = "created_jails.lst" 253 254 def __init__(self, topology_id: str): 255 self.topology_id = topology_id 256 self.file_name = self.JAILS_FNAME 257 self._vnets: List[str] = [] 258 259 def _register_vnet(self, vnet_name: str): 260 self._vnets.append(vnet_name) 261 with open(self.file_name, "a") as f: 262 f.write(vnet_name + "\n") 263 264 @staticmethod 265 def _wait_interfaces(vnet_name: str, ifaces: List[str]) -> List[str]: 266 cmd = "jexec {} /sbin/ifconfig -l".format(vnet_name) 267 not_matched: List[str] = [] 268 for i in range(50): 269 vnet_ifaces = run_cmd(cmd).strip().split(" ") 270 not_matched = [] 271 for iface_name in ifaces: 272 if iface_name not in vnet_ifaces: 273 not_matched.append(iface_name) 274 if len(not_matched) == 0: 275 return [] 276 time.sleep(0.1) 277 return not_matched 278 279 def create_vnet(self, vnet_alias: str, ifaces: List[VnetInterface]): 280 vnet_name = "pytest:{}".format(convert_test_name(self.topology_id)) 281 if self._vnets: 282 # add number to distinguish jails 283 vnet_name = "{}_{}".format(vnet_name, len(self._vnets) + 1) 284 iface_cmds = " ".join(["vnet.interface={}".format(i.name) for i in ifaces]) 285 cmd = "/usr/sbin/jail -i -c name={} persist vnet {}".format( 286 vnet_name, iface_cmds 287 ) 288 jid = 0 289 try: 290 jid_str = run_cmd(cmd) 291 jid = int(jid_str) 292 except ValueError: 293 print("Jail creation failed, output: {}".format(jid_str)) 294 raise 295 self._register_vnet(vnet_name) 296 297 # Run expedited version of routing 298 VnetInterface.setup_loopback(vnet_name) 299 300 not_found = self._wait_interfaces(vnet_name, [i.name for i in ifaces]) 301 if not_found: 302 raise Exception( 303 "Interfaces {} has not appeared in vnet {}".format(not_found, vnet_name) 304 ) 305 return VnetInstance(vnet_alias, vnet_name, jid, ifaces) 306 307 def cleanup(self): 308 iface_factory = IfaceFactory() 309 try: 310 with open(self.file_name) as f: 311 for line in f: 312 vnet_name = line.strip() 313 iface_factory.cleanup_vnet_interfaces(vnet_name) 314 run_cmd("/usr/sbin/jail -r {}".format(vnet_name)) 315 os.unlink(self.JAILS_FNAME) 316 except OSError: 317 pass 318 319 320class SingleInterfaceMap(NamedTuple): 321 ifaces: List[VnetInterface] 322 vnet_aliases: List[str] 323 324 325class ObjectsMap(NamedTuple): 326 iface_map: Dict[str, SingleInterfaceMap] # keyed by ifX 327 vnet_map: Dict[str, VnetInstance] # keyed by vnetX 328 topo_map: Dict # self.TOPOLOGY 329 330 331class VnetTestTemplate(BaseTest): 332 NEED_ROOT: bool = True 333 TOPOLOGY = {} 334 335 def _get_vnet_handler(self, vnet_alias: str): 336 handler_name = "{}_handler".format(vnet_alias) 337 return getattr(self, handler_name, None) 338 339 def _setup_vnet(self, vnet: VnetInstance, obj_map: Dict, pipe): 340 """Base Handler to setup given VNET. 341 Can be run in a subprocess. If so, passes control to the special 342 vnetX_handler() after setting up interface addresses 343 """ 344 vnet.attach() 345 print("# setup_vnet({})".format(vnet.name)) 346 if pipe is not None: 347 vnet.set_pipe(pipe) 348 349 topo = obj_map.topo_map 350 ipv6_ifaces = [] 351 # Disable DAD 352 if not vnet.need_dad: 353 vnet.disable_dad() 354 for iface in vnet.ifaces: 355 # check index of vnet within an interface 356 # as we have prefixes for both ends of the interface 357 iface_map = obj_map.iface_map[iface.alias] 358 idx = iface_map.vnet_aliases.index(vnet.alias) 359 prefixes6 = topo[iface.alias].get("prefixes6", []) 360 prefixes4 = topo[iface.alias].get("prefixes4", []) 361 if prefixes6 or prefixes4: 362 ipv6_ifaces.append(iface) 363 iface.turn_up() 364 if prefixes6: 365 iface.enable_ipv6() 366 for prefix in prefixes6 + prefixes4: 367 iface.setup_addr(prefix[idx]) 368 for iface in ipv6_ifaces: 369 while iface.has_tentative(): 370 time.sleep(0.1) 371 372 # Run actual handler 373 handler = self._get_vnet_handler(vnet.alias) 374 if handler: 375 # Do unbuffered stdout for children 376 # so the logs are present if the child hangs 377 sys.stdout.reconfigure(line_buffering=True) 378 self.drop_privileges() 379 handler(vnet) 380 381 def setup_topology(self, topo: Dict, topology_id: str): 382 """Creates jails & interfaces for the provided topology""" 383 iface_map: Dict[str, SingleInterfaceMap] = {} 384 vnet_map = {} 385 iface_factory = IfaceFactory() 386 vnet_factory = VnetFactory(topology_id) 387 for obj_name, obj_data in topo.items(): 388 if obj_name.startswith("if"): 389 epair_ifaces = iface_factory.create_iface(obj_name, "epair") 390 smap = SingleInterfaceMap(epair_ifaces, []) 391 iface_map[obj_name] = smap 392 for obj_name, obj_data in topo.items(): 393 if obj_name.startswith("vnet"): 394 vnet_ifaces = [] 395 for iface_alias in obj_data["ifaces"]: 396 # epair creates 2 interfaces, grab first _available_ 397 # and map it to the VNET being created 398 idx = len(iface_map[iface_alias].vnet_aliases) 399 iface_map[iface_alias].vnet_aliases.append(obj_name) 400 vnet_ifaces.append(iface_map[iface_alias].ifaces[idx]) 401 vnet = vnet_factory.create_vnet(obj_name, vnet_ifaces) 402 vnet_map[obj_name] = vnet 403 # Debug output 404 print("============= TEST TOPOLOGY =============") 405 for vnet_alias, vnet in vnet_map.items(): 406 print("# vnet {} -> {}".format(vnet.alias, vnet.name), end="") 407 handler = self._get_vnet_handler(vnet.alias) 408 if handler: 409 print(" handler: {}".format(handler.__name__), end="") 410 print() 411 for iface_alias, iface_data in iface_map.items(): 412 vnets = iface_data.vnet_aliases 413 ifaces: List[VnetInterface] = iface_data.ifaces 414 if len(vnets) == 1 and len(ifaces) == 2: 415 print( 416 "# iface {}: {}::{} -> main::{}".format( 417 iface_alias, vnets[0], ifaces[0].name, ifaces[1].name 418 ) 419 ) 420 elif len(vnets) == 2 and len(ifaces) == 2: 421 print( 422 "# iface {}: {}::{} -> {}::{}".format( 423 iface_alias, vnets[0], ifaces[0].name, vnets[1], ifaces[1].name 424 ) 425 ) 426 else: 427 print( 428 "# iface {}: ifaces: {} vnets: {}".format( 429 iface_alias, vnets, [i.name for i in ifaces] 430 ) 431 ) 432 print() 433 return ObjectsMap(iface_map, vnet_map, topo) 434 435 def setup_method(self, _method): 436 """Sets up all the required topology and handlers for the given test""" 437 super().setup_method(_method) 438 # TestIP6Output.test_output6_pktinfo[ipandif] 439 topology_id = get_topology_id(self.test_id) 440 topology = self.TOPOLOGY 441 # First, setup kernel objects - interfaces & vnets 442 obj_map = self.setup_topology(topology, topology_id) 443 main_vnet = None # one without subprocess handler 444 for vnet_alias, vnet in obj_map.vnet_map.items(): 445 if self._get_vnet_handler(vnet_alias): 446 # Need subprocess to run 447 parent_pipe, child_pipe = Pipe() 448 p = Process( 449 target=self._setup_vnet, 450 args=( 451 vnet, 452 obj_map, 453 child_pipe, 454 ), 455 ) 456 vnet.set_pipe(parent_pipe) 457 vnet.set_subprocess(p) 458 p.start() 459 else: 460 if main_vnet is not None: 461 raise Exception("there can be only 1 VNET w/o handler") 462 main_vnet = vnet 463 # Main vnet needs to be the last, so all the other subprocesses 464 # are started & their pipe handles collected 465 self.vnet = main_vnet 466 self._setup_vnet(main_vnet, obj_map, None) 467 # Save state for the main handler 468 self.iface_map = obj_map.iface_map 469 self.vnet_map = obj_map.vnet_map 470 self.drop_privileges() 471 472 def cleanup(self, test_id: str): 473 # pytest test id: file::class::test_name 474 topology_id = get_topology_id(self.test_id) 475 476 print("==== vnet cleanup ===") 477 print("# topology_id: '{}'".format(topology_id)) 478 VnetFactory(topology_id).cleanup() 479 IfaceFactory().cleanup() 480 481 def wait_object(self, pipe, timeout=5): 482 if pipe.poll(timeout): 483 return pipe.recv() 484 raise TimeoutError 485 486 def send_object(self, pipe, obj): 487 pipe.send(obj) 488 489 @property 490 def curvnet(self): 491 pass 492 493 494class SingleVnetTestTemplate(VnetTestTemplate): 495 IPV6_PREFIXES: List[str] = [] 496 IPV4_PREFIXES: List[str] = [] 497 498 def setup_method(self, method): 499 topology = copy.deepcopy( 500 { 501 "vnet1": {"ifaces": ["if1"]}, 502 "if1": {"prefixes4": [], "prefixes6": []}, 503 } 504 ) 505 for prefix in self.IPV6_PREFIXES: 506 topology["if1"]["prefixes6"].append((prefix,)) 507 for prefix in self.IPV4_PREFIXES: 508 topology["if1"]["prefixes4"].append((prefix,)) 509 self.TOPOLOGY = topology 510 super().setup_method(method) 511