xref: /linux/tools/testing/selftests/net/lib/py/netns.py (revision 45079e00133ee78fd216ccc4285534044ea69173)
1# SPDX-License-Identifier: GPL-2.0
2
3import ctypes
4import os
5import random
6import string
7import subprocess
8import time
9from pathlib import Path
10
11from .utils import ip
12
13libc = ctypes.cdll.LoadLibrary('libc.so.6')
14
15
16class NetNS:
17    def __init__(self, name=None):
18        if name:
19            self.name = name
20        else:
21            self.name = ''.join(random.choice(string.ascii_lowercase) for _ in range(8))
22        ip('netns add ' + self.name)
23
24    def __del__(self):
25        if self.name:
26            ip('netns del ' + self.name)
27            self.name = None
28
29    def __enter__(self):
30        return self
31
32    def __exit__(self, ex_type, ex_value, ex_tb):
33        self.__del__()
34
35    def __str__(self):
36        return self.name
37
38    def __repr__(self):
39        return f"NetNS({self.name})"
40
41
42class UserNetNS:
43    """Network namespace owned by a non-init user namespace."""
44
45    def __init__(self):
46        self.name = ''.join(
47            random.choice(string.ascii_lowercase) for _ in range(8))
48        self.user_ns_path = f"/run/userns/{self.name}"
49        self.net_ns_path = f"/run/netns/{self.name}"
50        self._user_mounted = False
51        self._net_mounted = False
52
53        os.makedirs("/run/userns", exist_ok=True)
54        os.makedirs("/run/netns", exist_ok=True)
55
56        Path(self.user_ns_path).touch()
57        Path(self.net_ns_path).touch()
58
59        with subprocess.Popen(
60                ["unshare", "--user", "--net", "--map-root-user",
61                 "sleep", "infinity"]) as proc:
62            try:
63                pid = proc.pid
64                init_user = os.readlink("/proc/self/ns/user")
65                for _ in range(200):
66                    try:
67                        if os.readlink(f"/proc/{pid}/ns/user") != init_user:
68                            break
69                    except OSError:
70                        pass
71                    time.sleep(0.01)
72                else:
73                    raise RuntimeError("unshare child did not create userns")
74
75                subprocess.run(["mount", "--bind", f"/proc/{pid}/ns/user",
76                                self.user_ns_path], check=True)
77                self._user_mounted = True
78                subprocess.run(["mount", "--bind", f"/proc/{pid}/ns/net",
79                                self.net_ns_path], check=True)
80                self._net_mounted = True
81            finally:
82                proc.kill()
83
84    def __del__(self):
85        if self._net_mounted:
86            subprocess.run(["umount", self.net_ns_path], check=False)
87            self._net_mounted = False
88        if self._user_mounted:
89            subprocess.run(["umount", self.user_ns_path], check=False)
90            self._user_mounted = False
91        for path in (self.net_ns_path, self.user_ns_path):
92            try:
93                os.unlink(path)
94            except OSError:
95                pass
96
97    def __enter__(self):
98        return self
99
100    def __exit__(self, ex_type, ex_value, ex_tb):
101        self.__del__()
102
103    def __str__(self):
104        return self.name
105
106    def __repr__(self):
107        return f"UserNetNS({self.name})"
108
109
110class NetNSEnter:
111    def __init__(self, ns_name):
112        self.ns_path = f"/run/netns/{ns_name}"
113
114    def __enter__(self):
115        self.saved = open("/proc/thread-self/ns/net")
116        with open(self.ns_path) as ns_file:
117            libc.setns(ns_file.fileno(), 0)
118        return self
119
120    def __exit__(self, exc_type, exc_value, traceback):
121        libc.setns(self.saved.fileno(), 0)
122        self.saved.close()
123