1# SPDX-License-Identifier: GPL-2.0 2 3import os 4import time 5from pathlib import Path 6from lib.py import KsftSkipEx, KsftXfailEx 7from lib.py import ksft_setup 8from lib.py import cmd, ethtool, ip, CmdExitFailure 9from lib.py import NetNS, NetdevSimDev 10from .remote import Remote 11 12 13class NetDrvEnvBase: 14 """ 15 Base class for a NIC / host envirnoments 16 """ 17 def __init__(self, src_path): 18 self.src_path = src_path 19 self.env = self._load_env_file() 20 21 def rpath(self, path): 22 """ 23 Get an absolute path to a file based on a path relative to the directory 24 containing the test which constructed env. 25 26 For example, if the test.py is in the same directory as 27 a binary (built from helper.c), the test can use env.rpath("helper") 28 to get the absolute path to the binary 29 """ 30 src_dir = Path(self.src_path).parent.resolve() 31 return (src_dir / path).as_posix() 32 33 def _load_env_file(self): 34 env = os.environ.copy() 35 36 src_dir = Path(self.src_path).parent.resolve() 37 if not (src_dir / "net.config").exists(): 38 return ksft_setup(env) 39 40 with open((src_dir / "net.config").as_posix(), 'r') as fp: 41 for line in fp.readlines(): 42 full_file = line 43 # Strip comments 44 pos = line.find("#") 45 if pos >= 0: 46 line = line[:pos] 47 line = line.strip() 48 if not line: 49 continue 50 pair = line.split('=', maxsplit=1) 51 if len(pair) != 2: 52 raise Exception("Can't parse configuration line:", full_file) 53 env[pair[0]] = pair[1] 54 return ksft_setup(env) 55 56 57class NetDrvEnv(NetDrvEnvBase): 58 """ 59 Class for a single NIC / host env, with no remote end 60 """ 61 def __init__(self, src_path, nsim_test=None, **kwargs): 62 super().__init__(src_path) 63 64 self._ns = None 65 66 if 'NETIF' in self.env: 67 if nsim_test is True: 68 raise KsftXfailEx("Test only works on netdevsim") 69 70 self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0] 71 else: 72 if nsim_test is False: 73 raise KsftXfailEx("Test does not work on netdevsim") 74 75 self._ns = NetdevSimDev(**kwargs) 76 self.dev = self._ns.nsims[0].dev 77 self.ifname = self.dev['ifname'] 78 self.ifindex = self.dev['ifindex'] 79 80 def __enter__(self): 81 ip(f"link set dev {self.dev['ifname']} up") 82 83 return self 84 85 def __exit__(self, ex_type, ex_value, ex_tb): 86 """ 87 __exit__ gets called at the end of a "with" block. 88 """ 89 self.__del__() 90 91 def __del__(self): 92 if self._ns: 93 self._ns.remove() 94 self._ns = None 95 96 97class NetDrvEpEnv(NetDrvEnvBase): 98 """ 99 Class for an environment with a local device and "remote endpoint" 100 which can be used to send traffic in. 101 102 For local testing it creates two network namespaces and a pair 103 of netdevsim devices. 104 """ 105 106 # Network prefixes used for local tests 107 nsim_v4_pfx = "192.0.2." 108 nsim_v6_pfx = "2001:db8::" 109 110 def __init__(self, src_path, nsim_test=None): 111 super().__init__(src_path) 112 113 self._stats_settle_time = None 114 115 # Things we try to destroy 116 self.remote = None 117 # These are for local testing state 118 self._netns = None 119 self._ns = None 120 self._ns_peer = None 121 122 self.addr_v = { "4": None, "6": None } 123 self.remote_addr_v = { "4": None, "6": None } 124 125 if "NETIF" in self.env: 126 if nsim_test is True: 127 raise KsftXfailEx("Test only works on netdevsim") 128 self._check_env() 129 130 self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0] 131 132 self.addr_v["4"] = self.env.get("LOCAL_V4") 133 self.addr_v["6"] = self.env.get("LOCAL_V6") 134 self.remote_addr_v["4"] = self.env.get("REMOTE_V4") 135 self.remote_addr_v["6"] = self.env.get("REMOTE_V6") 136 kind = self.env["REMOTE_TYPE"] 137 args = self.env["REMOTE_ARGS"] 138 else: 139 if nsim_test is False: 140 raise KsftXfailEx("Test does not work on netdevsim") 141 142 self.create_local() 143 144 self.dev = self._ns.nsims[0].dev 145 146 self.addr_v["4"] = self.nsim_v4_pfx + "1" 147 self.addr_v["6"] = self.nsim_v6_pfx + "1" 148 self.remote_addr_v["4"] = self.nsim_v4_pfx + "2" 149 self.remote_addr_v["6"] = self.nsim_v6_pfx + "2" 150 kind = "netns" 151 args = self._netns.name 152 153 self.remote = Remote(kind, args, src_path) 154 155 self.addr_ipver = "6" if self.addr_v["6"] else "4" 156 self.addr = self.addr_v[self.addr_ipver] 157 self.remote_addr = self.remote_addr_v[self.addr_ipver] 158 159 # Bracketed addresses, some commands need IPv6 to be inside [] 160 self.baddr = f"[{self.addr_v['6']}]" if self.addr_v["6"] else self.addr_v["4"] 161 self.remote_baddr = f"[{self.remote_addr_v['6']}]" if self.remote_addr_v["6"] else self.remote_addr_v["4"] 162 163 self.ifname = self.dev['ifname'] 164 self.ifindex = self.dev['ifindex'] 165 166 # resolve remote interface name 167 self.remote_ifname = self.resolve_remote_ifc() 168 169 self._required_cmd = {} 170 171 def create_local(self): 172 self._netns = NetNS() 173 self._ns = NetdevSimDev() 174 self._ns_peer = NetdevSimDev(ns=self._netns) 175 176 with open("/proc/self/ns/net") as nsfd0, \ 177 open("/var/run/netns/" + self._netns.name) as nsfd1: 178 ifi0 = self._ns.nsims[0].ifindex 179 ifi1 = self._ns_peer.nsims[0].ifindex 180 NetdevSimDev.ctrl_write('link_device', 181 f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}') 182 183 ip(f" addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24") 184 ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad") 185 ip(f" link set dev {self._ns.nsims[0].ifname} up") 186 187 ip(f" addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns) 188 ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns) 189 ip(f" link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns) 190 191 def _check_env(self): 192 vars_needed = [ 193 ["LOCAL_V4", "LOCAL_V6"], 194 ["REMOTE_V4", "REMOTE_V6"], 195 ["REMOTE_TYPE"], 196 ["REMOTE_ARGS"] 197 ] 198 missing = [] 199 200 for choice in vars_needed: 201 for entry in choice: 202 if entry in self.env: 203 break 204 else: 205 missing.append(choice) 206 # Make sure v4 / v6 configs are symmetric 207 if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env): 208 missing.append(["LOCAL_V6", "REMOTE_V6"]) 209 if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env): 210 missing.append(["LOCAL_V4", "REMOTE_V4"]) 211 if missing: 212 raise Exception("Invalid environment, missing configuration:", missing, 213 "Please see tools/testing/selftests/drivers/net/README.rst") 214 215 def resolve_remote_ifc(self): 216 v4 = v6 = None 217 if self.remote_addr_v["4"]: 218 v4 = ip("addr show to " + self.remote_addr_v["4"], json=True, host=self.remote) 219 if self.remote_addr_v["6"]: 220 v6 = ip("addr show to " + self.remote_addr_v["6"], json=True, host=self.remote) 221 if v4 and v6 and v4[0]["ifname"] != v6[0]["ifname"]: 222 raise Exception("Can't resolve remote interface name, v4 and v6 don't match") 223 if (v4 and len(v4) > 1) or (v6 and len(v6) > 1): 224 raise Exception("Can't resolve remote interface name, multiple interfaces match") 225 return v6[0]["ifname"] if v6 else v4[0]["ifname"] 226 227 def __enter__(self): 228 return self 229 230 def __exit__(self, ex_type, ex_value, ex_tb): 231 """ 232 __exit__ gets called at the end of a "with" block. 233 """ 234 self.__del__() 235 236 def __del__(self): 237 if self._ns: 238 self._ns.remove() 239 self._ns = None 240 if self._ns_peer: 241 self._ns_peer.remove() 242 self._ns_peer = None 243 if self._netns: 244 del self._netns 245 self._netns = None 246 if self.remote: 247 del self.remote 248 self.remote = None 249 250 def require_ipver(self, ipver): 251 if not self.addr_v[ipver] or not self.remote_addr_v[ipver]: 252 raise KsftSkipEx(f"Test requires IPv{ipver} connectivity") 253 254 def _require_cmd(self, comm, key, host=None): 255 cached = self._required_cmd.get(comm, {}) 256 if cached.get(key) is None: 257 cached[key] = cmd("command -v -- " + comm, fail=False, 258 shell=True, host=host).ret == 0 259 self._required_cmd[comm] = cached 260 return cached[key] 261 262 def require_cmd(self, comm, local=True, remote=False): 263 if local: 264 if not self._require_cmd(comm, "local"): 265 raise KsftSkipEx("Test requires command: " + comm) 266 if remote: 267 if not self._require_cmd(comm, "remote"): 268 raise KsftSkipEx("Test requires (remote) command: " + comm) 269 270 def wait_hw_stats_settle(self): 271 """ 272 Wait for HW stats to become consistent, some devices DMA HW stats 273 periodically so events won't be reflected until next sync. 274 Good drivers will tell us via ethtool what their sync period is. 275 """ 276 if self._stats_settle_time is None: 277 data = {} 278 try: 279 data = ethtool("-c " + self.ifname, json=True)[0] 280 except CmdExitFailure as e: 281 if "Operation not supported" not in e.cmd.stderr: 282 raise 283 284 self._stats_settle_time = 0.025 + \ 285 data.get('stats-block-usecs', 0) / 1000 / 1000 286 287 time.sleep(self._stats_settle_time) 288