1# SPDX-License-Identifier: GPL-2.0 2 3import os 4import time 5from pathlib import Path 6from lib.py import KsftSkipEx, KsftXfailEx 7from lib.py import ksft_setup, wait_file 8from lib.py import cmd, ethtool, ip, CmdExitFailure 9from lib.py import NetNS, NetdevSimDev 10from .remote import Remote 11 12 13class NetDrvEnvBase: 14 """ 15 Base class for a NIC / host environments 16 17 Attributes: 18 test_dir: Path to the source directory of the test 19 net_lib_dir: Path to the net/lib directory 20 """ 21 def __init__(self, src_path): 22 self.src_path = Path(src_path) 23 self.test_dir = self.src_path.parent.resolve() 24 self.net_lib_dir = (Path(__file__).parent / "../../../../net/lib").resolve() 25 26 self.env = self._load_env_file() 27 28 # Following attrs must be set be inheriting classes 29 self.dev = None 30 31 def _load_env_file(self): 32 env = os.environ.copy() 33 34 src_dir = Path(self.src_path).parent.resolve() 35 if not (src_dir / "net.config").exists(): 36 return ksft_setup(env) 37 38 with open((src_dir / "net.config").as_posix(), 'r') as fp: 39 for line in fp.readlines(): 40 full_file = line 41 # Strip comments 42 pos = line.find("#") 43 if pos >= 0: 44 line = line[:pos] 45 line = line.strip() 46 if not line: 47 continue 48 pair = line.split('=', maxsplit=1) 49 if len(pair) != 2: 50 raise Exception("Can't parse configuration line:", full_file) 51 env[pair[0]] = pair[1] 52 return ksft_setup(env) 53 54 def __del__(self): 55 pass 56 57 def __enter__(self): 58 ip(f"link set dev {self.dev['ifname']} up") 59 wait_file(f"/sys/class/net/{self.dev['ifname']}/carrier", 60 lambda x: x.strip() == "1") 61 62 return self 63 64 def __exit__(self, ex_type, ex_value, ex_tb): 65 """ 66 __exit__ gets called at the end of a "with" block. 67 """ 68 self.__del__() 69 70 71class NetDrvEnv(NetDrvEnvBase): 72 """ 73 Class for a single NIC / host env, with no remote end 74 """ 75 def __init__(self, src_path, nsim_test=None, **kwargs): 76 super().__init__(src_path) 77 78 self._ns = None 79 80 if 'NETIF' in self.env: 81 if nsim_test is True: 82 raise KsftXfailEx("Test only works on netdevsim") 83 84 self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0] 85 else: 86 if nsim_test is False: 87 raise KsftXfailEx("Test does not work on netdevsim") 88 89 self._ns = NetdevSimDev(**kwargs) 90 self.dev = self._ns.nsims[0].dev 91 self.ifname = self.dev['ifname'] 92 self.ifindex = self.dev['ifindex'] 93 94 def __del__(self): 95 if self._ns: 96 self._ns.remove() 97 self._ns = None 98 99 100class NetDrvEpEnv(NetDrvEnvBase): 101 """ 102 Class for an environment with a local device and "remote endpoint" 103 which can be used to send traffic in. 104 105 For local testing it creates two network namespaces and a pair 106 of netdevsim devices. 107 """ 108 109 # Network prefixes used for local tests 110 nsim_v4_pfx = "192.0.2." 111 nsim_v6_pfx = "2001:db8::" 112 113 def __init__(self, src_path, nsim_test=None): 114 super().__init__(src_path) 115 116 self._stats_settle_time = None 117 118 # Things we try to destroy 119 self.remote = None 120 # These are for local testing state 121 self._netns = None 122 self._ns = None 123 self._ns_peer = None 124 125 self.addr_v = { "4": None, "6": None } 126 self.remote_addr_v = { "4": None, "6": None } 127 128 if "NETIF" in self.env: 129 if nsim_test is True: 130 raise KsftXfailEx("Test only works on netdevsim") 131 self._check_env() 132 133 self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0] 134 135 self.addr_v["4"] = self.env.get("LOCAL_V4") 136 self.addr_v["6"] = self.env.get("LOCAL_V6") 137 self.remote_addr_v["4"] = self.env.get("REMOTE_V4") 138 self.remote_addr_v["6"] = self.env.get("REMOTE_V6") 139 kind = self.env["REMOTE_TYPE"] 140 args = self.env["REMOTE_ARGS"] 141 else: 142 if nsim_test is False: 143 raise KsftXfailEx("Test does not work on netdevsim") 144 145 self.create_local() 146 147 self.dev = self._ns.nsims[0].dev 148 149 self.addr_v["4"] = self.nsim_v4_pfx + "1" 150 self.addr_v["6"] = self.nsim_v6_pfx + "1" 151 self.remote_addr_v["4"] = self.nsim_v4_pfx + "2" 152 self.remote_addr_v["6"] = self.nsim_v6_pfx + "2" 153 kind = "netns" 154 args = self._netns.name 155 156 self.remote = Remote(kind, args, src_path) 157 158 self.addr_ipver = "6" if self.addr_v["6"] else "4" 159 self.addr = self.addr_v[self.addr_ipver] 160 self.remote_addr = self.remote_addr_v[self.addr_ipver] 161 162 # Bracketed addresses, some commands need IPv6 to be inside [] 163 self.baddr = f"[{self.addr_v['6']}]" if self.addr_v["6"] else self.addr_v["4"] 164 self.remote_baddr = f"[{self.remote_addr_v['6']}]" if self.remote_addr_v["6"] else self.remote_addr_v["4"] 165 166 self.ifname = self.dev['ifname'] 167 self.ifindex = self.dev['ifindex'] 168 169 # resolve remote interface name 170 self.remote_ifname = self.resolve_remote_ifc() 171 172 self._required_cmd = {} 173 174 def create_local(self): 175 self._netns = NetNS() 176 self._ns = NetdevSimDev() 177 self._ns_peer = NetdevSimDev(ns=self._netns) 178 179 with open("/proc/self/ns/net") as nsfd0, \ 180 open("/var/run/netns/" + self._netns.name) as nsfd1: 181 ifi0 = self._ns.nsims[0].ifindex 182 ifi1 = self._ns_peer.nsims[0].ifindex 183 NetdevSimDev.ctrl_write('link_device', 184 f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}') 185 186 ip(f" addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24") 187 ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad") 188 ip(f" link set dev {self._ns.nsims[0].ifname} up") 189 190 ip(f" addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns) 191 ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns) 192 ip(f" link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns) 193 194 def _check_env(self): 195 vars_needed = [ 196 ["LOCAL_V4", "LOCAL_V6"], 197 ["REMOTE_V4", "REMOTE_V6"], 198 ["REMOTE_TYPE"], 199 ["REMOTE_ARGS"] 200 ] 201 missing = [] 202 203 for choice in vars_needed: 204 for entry in choice: 205 if entry in self.env: 206 break 207 else: 208 missing.append(choice) 209 # Make sure v4 / v6 configs are symmetric 210 if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env): 211 missing.append(["LOCAL_V6", "REMOTE_V6"]) 212 if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env): 213 missing.append(["LOCAL_V4", "REMOTE_V4"]) 214 if missing: 215 raise Exception("Invalid environment, missing configuration:", missing, 216 "Please see tools/testing/selftests/drivers/net/README.rst") 217 218 def resolve_remote_ifc(self): 219 v4 = v6 = None 220 if self.remote_addr_v["4"]: 221 v4 = ip("addr show to " + self.remote_addr_v["4"], json=True, host=self.remote) 222 if self.remote_addr_v["6"]: 223 v6 = ip("addr show to " + self.remote_addr_v["6"], json=True, host=self.remote) 224 if v4 and v6 and v4[0]["ifname"] != v6[0]["ifname"]: 225 raise Exception("Can't resolve remote interface name, v4 and v6 don't match") 226 if (v4 and len(v4) > 1) or (v6 and len(v6) > 1): 227 raise Exception("Can't resolve remote interface name, multiple interfaces match") 228 return v6[0]["ifname"] if v6 else v4[0]["ifname"] 229 230 def __del__(self): 231 if self._ns: 232 self._ns.remove() 233 self._ns = None 234 if self._ns_peer: 235 self._ns_peer.remove() 236 self._ns_peer = None 237 if self._netns: 238 del self._netns 239 self._netns = None 240 if self.remote: 241 del self.remote 242 self.remote = None 243 244 def require_ipver(self, ipver): 245 if not self.addr_v[ipver] or not self.remote_addr_v[ipver]: 246 raise KsftSkipEx(f"Test requires IPv{ipver} connectivity") 247 248 def require_nsim(self): 249 if self._ns is None: 250 raise KsftXfailEx("Test only works on netdevsim") 251 252 def _require_cmd(self, comm, key, host=None): 253 cached = self._required_cmd.get(comm, {}) 254 if cached.get(key) is None: 255 cached[key] = cmd("command -v -- " + comm, fail=False, 256 shell=True, host=host).ret == 0 257 self._required_cmd[comm] = cached 258 return cached[key] 259 260 def require_cmd(self, comm, local=True, remote=False): 261 if local: 262 if not self._require_cmd(comm, "local"): 263 raise KsftSkipEx("Test requires command: " + comm) 264 if remote: 265 if not self._require_cmd(comm, "remote", host=self.remote): 266 raise KsftSkipEx("Test requires (remote) command: " + comm) 267 268 def wait_hw_stats_settle(self): 269 """ 270 Wait for HW stats to become consistent, some devices DMA HW stats 271 periodically so events won't be reflected until next sync. 272 Good drivers will tell us via ethtool what their sync period is. 273 """ 274 if self._stats_settle_time is None: 275 data = {} 276 try: 277 data = ethtool("-c " + self.ifname, json=True)[0] 278 except CmdExitFailure as e: 279 if "Operation not supported" not in e.cmd.stderr: 280 raise 281 282 self._stats_settle_time = 0.025 + \ 283 data.get('stats-block-usecs', 0) / 1000 / 1000 284 285 time.sleep(self._stats_settle_time) 286