1# SPDX-License-Identifier: GPL-2.0 2 3import ctypes 4import os 5import random 6import string 7import subprocess 8import time 9from pathlib import Path 10 11from .utils import ip 12 13libc = ctypes.cdll.LoadLibrary('libc.so.6') 14 15 16class NetNS: 17 def __init__(self, name=None): 18 if name: 19 self.name = name 20 else: 21 self.name = ''.join(random.choice(string.ascii_lowercase) for _ in range(8)) 22 ip('netns add ' + self.name) 23 24 def __del__(self): 25 if self.name: 26 ip('netns del ' + self.name) 27 self.name = None 28 29 def __enter__(self): 30 return self 31 32 def __exit__(self, ex_type, ex_value, ex_tb): 33 self.__del__() 34 35 def __str__(self): 36 return self.name 37 38 def __repr__(self): 39 return f"NetNS({self.name})" 40 41 42class UserNetNS: 43 """Network namespace owned by a non-init user namespace.""" 44 45 def __init__(self): 46 self.name = ''.join( 47 random.choice(string.ascii_lowercase) for _ in range(8)) 48 self.user_ns_path = f"/run/userns/{self.name}" 49 self.net_ns_path = f"/run/netns/{self.name}" 50 self._user_mounted = False 51 self._net_mounted = False 52 53 os.makedirs("/run/userns", exist_ok=True) 54 os.makedirs("/run/netns", exist_ok=True) 55 56 Path(self.user_ns_path).touch() 57 Path(self.net_ns_path).touch() 58 59 with subprocess.Popen( 60 ["unshare", "--user", "--net", "--map-root-user", 61 "sleep", "infinity"]) as proc: 62 try: 63 pid = proc.pid 64 init_user = os.readlink("/proc/self/ns/user") 65 for _ in range(200): 66 try: 67 if os.readlink(f"/proc/{pid}/ns/user") != init_user: 68 break 69 except OSError: 70 pass 71 time.sleep(0.01) 72 else: 73 raise RuntimeError("unshare child did not create userns") 74 75 subprocess.run(["mount", "--bind", f"/proc/{pid}/ns/user", 76 self.user_ns_path], check=True) 77 self._user_mounted = True 78 subprocess.run(["mount", "--bind", f"/proc/{pid}/ns/net", 79 self.net_ns_path], check=True) 80 self._net_mounted = True 81 finally: 82 proc.kill() 83 84 def __del__(self): 85 if self._net_mounted: 86 subprocess.run(["umount", self.net_ns_path], check=False) 87 self._net_mounted = False 88 if self._user_mounted: 89 subprocess.run(["umount", self.user_ns_path], check=False) 90 self._user_mounted = False 91 for path in (self.net_ns_path, self.user_ns_path): 92 try: 93 os.unlink(path) 94 except OSError: 95 pass 96 97 def __enter__(self): 98 return self 99 100 def __exit__(self, ex_type, ex_value, ex_tb): 101 self.__del__() 102 103 def __str__(self): 104 return self.name 105 106 def __repr__(self): 107 return f"UserNetNS({self.name})" 108 109 110class NetNSEnter: 111 def __init__(self, ns_name): 112 self.ns_path = f"/run/netns/{ns_name}" 113 114 def __enter__(self): 115 self.saved = open("/proc/thread-self/ns/net") 116 with open(self.ns_path) as ns_file: 117 libc.setns(ns_file.fileno(), 0) 118 return self 119 120 def __exit__(self, exc_type, exc_value, traceback): 121 libc.setns(self.saved.fileno(), 0) 122 self.saved.close() 123