1# SPDX-License-Identifier: GPL-2.0 2 3import os 4import time 5from pathlib import Path 6from lib.py import KsftSkipEx, KsftXfailEx 7from lib.py import ksft_setup, wait_file 8from lib.py import cmd, ethtool, ip, CmdExitFailure 9from lib.py import NetNS, NetdevSimDev 10from .remote import Remote 11 12 13class NetDrvEnvBase: 14 """ 15 Base class for a NIC / host environments 16 17 Attributes: 18 test_dir: Path to the source directory of the test 19 net_lib_dir: Path to the net/lib directory 20 """ 21 def __init__(self, src_path): 22 self.src_path = Path(src_path) 23 self.test_dir = self.src_path.parent.resolve() 24 self.net_lib_dir = (Path(__file__).parent / "../../../../net/lib").resolve() 25 26 self.env = self._load_env_file() 27 28 # Following attrs must be set be inheriting classes 29 self.dev = None 30 31 def _load_env_file(self): 32 env = os.environ.copy() 33 34 src_dir = Path(self.src_path).parent.resolve() 35 if not (src_dir / "net.config").exists(): 36 return ksft_setup(env) 37 38 with open((src_dir / "net.config").as_posix(), 'r') as fp: 39 for line in fp.readlines(): 40 full_file = line 41 # Strip comments 42 pos = line.find("#") 43 if pos >= 0: 44 line = line[:pos] 45 line = line.strip() 46 if not line: 47 continue 48 pair = line.split('=', maxsplit=1) 49 if len(pair) != 2: 50 raise Exception("Can't parse configuration line:", full_file) 51 env[pair[0]] = pair[1] 52 return ksft_setup(env) 53 54 def __del__(self): 55 pass 56 57 def __enter__(self): 58 ip(f"link set dev {self.dev['ifname']} up") 59 wait_file(f"/sys/class/net/{self.dev['ifname']}/carrier", 60 lambda x: x.strip() == "1") 61 62 return self 63 64 def __exit__(self, ex_type, ex_value, ex_tb): 65 """ 66 __exit__ gets called at the end of a "with" block. 67 """ 68 self.__del__() 69 70 71class NetDrvEnv(NetDrvEnvBase): 72 """ 73 Class for a single NIC / host env, with no remote end 74 """ 75 def __init__(self, src_path, nsim_test=None, **kwargs): 76 super().__init__(src_path) 77 78 self._ns = None 79 80 if 'NETIF' in self.env: 81 if nsim_test is True: 82 raise KsftXfailEx("Test only works on netdevsim") 83 84 self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0] 85 else: 86 if nsim_test is False: 87 raise KsftXfailEx("Test does not work on netdevsim") 88 89 self._ns = NetdevSimDev(**kwargs) 90 self.dev = self._ns.nsims[0].dev 91 self.ifname = self.dev['ifname'] 92 self.ifindex = self.dev['ifindex'] 93 94 def __del__(self): 95 if self._ns: 96 self._ns.remove() 97 self._ns = None 98 99 100class NetDrvEpEnv(NetDrvEnvBase): 101 """ 102 Class for an environment with a local device and "remote endpoint" 103 which can be used to send traffic in. 104 105 For local testing it creates two network namespaces and a pair 106 of netdevsim devices. 107 """ 108 109 # Network prefixes used for local tests 110 nsim_v4_pfx = "192.0.2." 111 nsim_v6_pfx = "2001:db8::" 112 113 def __init__(self, src_path, nsim_test=None): 114 super().__init__(src_path) 115 116 self._stats_settle_time = None 117 118 # Things we try to destroy 119 self.remote = None 120 # These are for local testing state 121 self._netns = None 122 self._ns = None 123 self._ns_peer = None 124 125 self.addr_v = { "4": None, "6": None } 126 self.remote_addr_v = { "4": None, "6": None } 127 128 if "NETIF" in self.env: 129 if nsim_test is True: 130 raise KsftXfailEx("Test only works on netdevsim") 131 self._check_env() 132 133 self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0] 134 135 self.addr_v["4"] = self.env.get("LOCAL_V4") 136 self.addr_v["6"] = self.env.get("LOCAL_V6") 137 self.remote_addr_v["4"] = self.env.get("REMOTE_V4") 138 self.remote_addr_v["6"] = self.env.get("REMOTE_V6") 139 kind = self.env["REMOTE_TYPE"] 140 args = self.env["REMOTE_ARGS"] 141 else: 142 if nsim_test is False: 143 raise KsftXfailEx("Test does not work on netdevsim") 144 145 self.create_local() 146 147 self.dev = self._ns.nsims[0].dev 148 149 self.addr_v["4"] = self.nsim_v4_pfx + "1" 150 self.addr_v["6"] = self.nsim_v6_pfx + "1" 151 self.remote_addr_v["4"] = self.nsim_v4_pfx + "2" 152 self.remote_addr_v["6"] = self.nsim_v6_pfx + "2" 153 kind = "netns" 154 args = self._netns.name 155 156 self.remote = Remote(kind, args, src_path) 157 158 self.addr_ipver = "6" if self.addr_v["6"] else "4" 159 self.addr = self.addr_v[self.addr_ipver] 160 self.remote_addr = self.remote_addr_v[self.addr_ipver] 161 162 # Bracketed addresses, some commands need IPv6 to be inside [] 163 self.baddr = f"[{self.addr_v['6']}]" if self.addr_v["6"] else self.addr_v["4"] 164 self.remote_baddr = f"[{self.remote_addr_v['6']}]" if self.remote_addr_v["6"] else self.remote_addr_v["4"] 165 166 self.ifname = self.dev['ifname'] 167 self.ifindex = self.dev['ifindex'] 168 169 # resolve remote interface name 170 self.remote_ifname = self.resolve_remote_ifc() 171 self.remote_dev = ip("-d link show dev " + self.remote_ifname, 172 host=self.remote, json=True)[0] 173 174 self._required_cmd = {} 175 176 def create_local(self): 177 self._netns = NetNS() 178 self._ns = NetdevSimDev() 179 self._ns_peer = NetdevSimDev(ns=self._netns) 180 181 with open("/proc/self/ns/net") as nsfd0, \ 182 open("/var/run/netns/" + self._netns.name) as nsfd1: 183 ifi0 = self._ns.nsims[0].ifindex 184 ifi1 = self._ns_peer.nsims[0].ifindex 185 NetdevSimDev.ctrl_write('link_device', 186 f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}') 187 188 ip(f" addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24") 189 ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad") 190 ip(f" link set dev {self._ns.nsims[0].ifname} up") 191 192 ip(f" addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns) 193 ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns) 194 ip(f" link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns) 195 196 def _check_env(self): 197 vars_needed = [ 198 ["LOCAL_V4", "LOCAL_V6"], 199 ["REMOTE_V4", "REMOTE_V6"], 200 ["REMOTE_TYPE"], 201 ["REMOTE_ARGS"] 202 ] 203 missing = [] 204 205 for choice in vars_needed: 206 for entry in choice: 207 if entry in self.env: 208 break 209 else: 210 missing.append(choice) 211 # Make sure v4 / v6 configs are symmetric 212 if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env): 213 missing.append(["LOCAL_V6", "REMOTE_V6"]) 214 if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env): 215 missing.append(["LOCAL_V4", "REMOTE_V4"]) 216 if missing: 217 raise Exception("Invalid environment, missing configuration:", missing, 218 "Please see tools/testing/selftests/drivers/net/README.rst") 219 220 def resolve_remote_ifc(self): 221 v4 = v6 = None 222 if self.remote_addr_v["4"]: 223 v4 = ip("addr show to " + self.remote_addr_v["4"], json=True, host=self.remote) 224 if self.remote_addr_v["6"]: 225 v6 = ip("addr show to " + self.remote_addr_v["6"], json=True, host=self.remote) 226 if v4 and v6 and v4[0]["ifname"] != v6[0]["ifname"]: 227 raise Exception("Can't resolve remote interface name, v4 and v6 don't match") 228 if (v4 and len(v4) > 1) or (v6 and len(v6) > 1): 229 raise Exception("Can't resolve remote interface name, multiple interfaces match") 230 return v6[0]["ifname"] if v6 else v4[0]["ifname"] 231 232 def __del__(self): 233 if self._ns: 234 self._ns.remove() 235 self._ns = None 236 if self._ns_peer: 237 self._ns_peer.remove() 238 self._ns_peer = None 239 if self._netns: 240 del self._netns 241 self._netns = None 242 if self.remote: 243 del self.remote 244 self.remote = None 245 246 def require_ipver(self, ipver): 247 if not self.addr_v[ipver] or not self.remote_addr_v[ipver]: 248 raise KsftSkipEx(f"Test requires IPv{ipver} connectivity") 249 250 def require_nsim(self): 251 if self._ns is None: 252 raise KsftXfailEx("Test only works on netdevsim") 253 254 def _require_cmd(self, comm, key, host=None): 255 cached = self._required_cmd.get(comm, {}) 256 if cached.get(key) is None: 257 cached[key] = cmd("command -v -- " + comm, fail=False, 258 shell=True, host=host).ret == 0 259 self._required_cmd[comm] = cached 260 return cached[key] 261 262 def require_cmd(self, comm, local=True, remote=False): 263 if local: 264 if not self._require_cmd(comm, "local"): 265 raise KsftSkipEx("Test requires command: " + comm) 266 if remote: 267 if not self._require_cmd(comm, "remote", host=self.remote): 268 raise KsftSkipEx("Test requires (remote) command: " + comm) 269 270 def wait_hw_stats_settle(self): 271 """ 272 Wait for HW stats to become consistent, some devices DMA HW stats 273 periodically so events won't be reflected until next sync. 274 Good drivers will tell us via ethtool what their sync period is. 275 """ 276 if self._stats_settle_time is None: 277 data = {} 278 try: 279 data = ethtool("-c " + self.ifname, json=True)[0] 280 except CmdExitFailure as e: 281 if "Operation not supported" not in e.cmd.stderr: 282 raise 283 284 self._stats_settle_time = 0.025 + \ 285 data.get('stats-block-usecs', 0) / 1000 / 1000 286 287 time.sleep(self._stats_settle_time) 288